converge/pkg/converge/admin.go

255 lines
6.1 KiB
Go

package converge
import (
"converge/pkg/comms"
"converge/pkg/concurrency"
"converge/pkg/iowrappers"
"fmt"
"io"
"log"
"net"
"strconv"
"sync"
"time"
)
type Agent struct {
// server session
commChannel comms.CommChannel
publicId string
startTime time.Time
agentInfo comms.AgentInfo
expiryTime time.Time
}
var clientIdGenerator = concurrency.NewAtomicCounter()
type Client struct {
publicId string
clientId int
agent net.Conn
client iowrappers.ReadWriteAddrCloser
startTime time.Time
sessionType string
}
func NewAgent(commChannel comms.CommChannel, publicId string, agentInfo comms.AgentInfo) *Agent {
return &Agent{
commChannel: commChannel,
publicId: publicId,
startTime: time.Now(),
agentInfo: agentInfo,
}
}
func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser,
agentConn net.Conn) *Client {
return &Client{
publicId: publicId,
clientId: clientIdGenerator.IncrementAndGet(),
agent: agentConn,
client: clientConn,
startTime: time.Now(),
}
}
type Admin struct {
// map of public id to agent
mutex sync.Mutex
agents map[string]*Agent
clients []*Client
}
func NewAdmin() *Admin {
admin := Admin{
mutex: sync.Mutex{},
agents: make(map[string]*Agent),
clients: make([]*Client, 0), // not strictly needed
}
return &admin
}
func (admin *Admin) logStatus() {
fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n"
log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS")
for _, agent := range admin.agents {
agent.commChannel.Session.RemoteAddr()
log.Printf(fmt, agent.publicId,
agent.startTime.Format(time.DateTime),
agent.expiryTime.Format(time.DateTime),
agent.agentInfo.Username,
agent.agentInfo.Hostname,
agent.agentInfo.OS)
}
log.Println("")
fmt = "%-10s %-20s %-20s %-20s %-20s\n"
log.Printf(fmt, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")
for _, client := range admin.clients {
log.Printf(fmt,
strconv.Itoa(client.clientId),
client.publicId,
client.startTime.Format(time.DateTime),
client.client.RemoteAddr(),
client.sessionType)
}
log.Printf("\n")
}
func (admin *Admin) addAgent(publicId string, agentInfo comms.AgentInfo, conn io.ReadWriteCloser) (*Agent, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent != nil {
return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId)
}
commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
if err != nil {
return nil, err
}
agent = NewAgent(commChannel, publicId, agentInfo)
admin.agents[publicId] = agent
admin.logStatus()
return agent, nil
}
func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser) (*Client, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
// we should setup on-demend connections ot agents later.
return nil, fmt.Errorf("No agent found for publicId '%s'", publicId)
}
agentConn, err := agent.commChannel.Session.Open()
if err != nil {
return nil, err
}
log.Println("Successful websocket connection to agent")
log.Println("Sending connection information to agent")
client := NewClient(publicId, clientConn, agentConn)
// Before using this connection for SSH we use it to send client metadata to the
// agent
err = comms.SendClientInfo(agentConn, comms.ClientInfo{
ClientId: client.clientId,
})
if err != nil {
return nil, err
}
admin.clients = append(admin.clients, client)
admin.logStatus()
return client, nil
}
func (admin *Admin) RemoveAgent(publicId string) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
}
log.Printf("Removing agent: '%s'", publicId)
err := agent.commChannel.Session.Close()
if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId)
}
delete(admin.agents, publicId)
admin.logStatus()
return nil
}
func (admin *Admin) RemoveClient(client *Client) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.publicId,
client.startTime.Format("2006-01-02 15:04:05"))
// try to explicitly close connection to the agent.
_ = client.agent.Close()
_ = client.client.Close()
for i, _client := range admin.clients {
if _client == _client {
admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
break
}
}
admin.logStatus()
return nil
}
func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser,
userPassword comms.UserPassword) error {
defer conn.Close()
serverInfo := comms.ServerInfo{
UserPassword: userPassword,
}
agentInfo, err := comms.ServerInitialization(conn, serverInfo)
if err != nil {
return err
}
agent, err := admin.addAgent(publicId, agentInfo, conn)
if err != nil {
return err
}
defer func() {
admin.RemoveAgent(publicId)
}()
go func() {
comms.ListenForAgentEvents(agent.commChannel.SideChannel,
func(info comms.AgentInfo) {
agent.agentInfo = info
admin.logStatus()
},
func(session comms.SessionInfo) {
for _, client := range admin.clients {
// a bit hacky. There should be at most one client that has an unset session
// Very unlikely for multiple sessions to start at the same point in time.
if client.publicId == agent.publicId && client.sessionType != session.SessionType {
client.sessionType = session.SessionType
break
}
}
},
func(expiry comms.ExpiryTimeUpdate) {
agent.expiryTime = expiry.ExpiryTime
admin.logStatus()
})
}()
go log.Printf("Agent registered: '%s'\n", publicId)
for !agent.commChannel.Session.IsClosed() {
time.Sleep(250 * time.Millisecond)
}
return nil
}
func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser) error {
defer conn.Close()
client, err := admin.addClient(publicId, conn)
if err != nil {
return err
}
defer func() {
admin.RemoveClient(client)
}()
log.Printf("Connecting client and agent: '%s'\n", publicId)
iowrappers.SynchronizeStreams(client.client, client.agent)
return nil
}