package admin import ( "fmt" "git.wamblee.org/converge/pkg/comms" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/support/concurrency" iowrappers2 "git.wamblee.org/converge/pkg/support/iowrappers" "io" "log" "math/rand" "net" "strconv" "sync" "time" ) type agentConnection struct { models.Agent // server session CommChannel comms.CommChannel } var agentIdGenerator = concurrency.NewAtomicCounter() var clientIdGenerator = concurrency.NewAtomicCounter() type ClientConnection struct { models.Client agentConnection net.Conn clientConnection iowrappers2.ReadWriteAddrCloser } func newAgent(commChannel comms.CommChannel, publicId string, agentInfo comms.EnvironmentInfo) *agentConnection { return &agentConnection{ Agent: models.Agent{ Guid: strconv.Itoa(rand.Int()), RemoteAddr: commChannel.Session.RemoteAddr().String(), PublicId: publicId, StartTime: time.Now(), EnvironmentInfo: agentInfo, }, CommChannel: commChannel, } } func newClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser, agentConn net.Conn, agentGuid string) *ClientConnection { return &ClientConnection{ Client: models.Client{ Guid: strconv.Itoa(rand.Int()), RemoteAddr: clientConn.RemoteAddr().String(), PublicId: publicId, AgentGuid: agentGuid, ClientId: strconv.Itoa(clientIdGenerator.IncrementAndGet()), StartTime: time.Now(), }, agentConnection: agentConn, clientConnection: clientConn, } } func (match *ClientConnection) Synchronize() { iowrappers2.SynchronizeStreams("client -- agent", match.clientConnection, match.agentConnection) } type Admin struct { // map of public id to agent mutex sync.Mutex agents map[string]*agentConnection clients []*ClientConnection } func NewAdmin() *Admin { return &Admin{ mutex: sync.Mutex{}, agents: make(map[string]*agentConnection), clients: make([]*ClientConnection, 0), // not strictly needed } } func (admin *Admin) CreateNotifification() *models.State { admin.mutex.Lock() defer admin.mutex.Unlock() state := models.State{} state.Agents = make([]models.Agent, 0, len(admin.agents)) state.Clients = make([]models.Client, 0, len(admin.clients)) for _, agent := range admin.agents { state.Agents = append(state.Agents, agent.Agent) } for _, client := range admin.clients { state.Clients = append(state.Clients, client.Client) } return &state } func (admin *Admin) getFreeId(publicId string) (string, error) { usedIds := make(map[string]bool) for _, agent := range admin.agents { usedIds[agent.PublicId] = true } if !usedIds[publicId] { return publicId, nil } if usedIds[publicId] { for i := 0; i < 100; i++ { candidate := publicId + "-" + strconv.Itoa(i) if !usedIds[candidate] { return candidate, nil } } } return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId) } func (admin *Admin) AddAgent(publicId string, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) { admin.mutex.Lock() defer admin.mutex.Unlock() newPublicId, err := admin.getFreeId(publicId) if err == nil { message := "Requested id is accepted" if publicId != newPublicId { message = "The server allocated a new id." } publicId = newPublicId comms.SendRegistrationMessage(conn, comms.AgentRegistration{ Ok: true, Message: message, Id: publicId, }) } else { comms.SendRegistrationMessage(conn, comms.AgentRegistration{ Ok: false, Message: err.Error(), }) } agent := admin.agents[publicId] if agent != nil { return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId) } commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn) if err != nil { return nil, err } agent = newAgent(commChannel, publicId, agentInfo) admin.agents[publicId] = agent return agent, nil } func (admin *Admin) AddClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser) (*ClientConnection, error) { admin.mutex.Lock() defer admin.mutex.Unlock() agent := admin.agents[publicId] if agent == nil { // we should setup on-demend connections ot agents later. return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId) } agentConn, err := admin.getAgentConnection(agent) if err != nil { return nil, err } log.Println("Successful websocket connection to agent") log.Println("Sending connection information to agent") client := newClient(publicId, clientConn, agentConn, agent.Guid) // Before using this connection for SSH we use it to send client metadata to the // agent err = comms.SendClientInfo(agentConn, comms.ClientInfo{ ClientId: client.ClientId, }) if err != nil { return nil, err } admin.clients = append(admin.clients, client) return client, nil } func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) { agentConn, err := agent.CommChannel.Session.Open() count := 0 for err != nil && count < 10 { log.Printf("Retrying connection to agent: %v", err) time.Sleep(250 * time.Millisecond) count++ agentConn, err = agent.CommChannel.Session.Open() } return agentConn, err } func (admin *Admin) RemoveAgent(publicId string) error { admin.mutex.Lock() defer admin.mutex.Unlock() agent := admin.agents[publicId] if agent == nil { return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) } log.Printf("Removing agent: '%s'", publicId) err := agent.CommChannel.Session.Close() if err != nil { log.Printf("Could not close yamux client session for '%s'\n", publicId) } delete(admin.agents, publicId) return nil } func (admin *Admin) RemoveClient(client *ClientConnection) error { admin.mutex.Lock() defer admin.mutex.Unlock() log.Printf("Removing client: '%s' created at %s\n", client.ClientId, client.StartTime.Format(time.DateTime)) // try to explicitly close connection to the agent. _ = client.agentConnection.Close() _ = client.clientConnection.Close() for i, _client := range admin.clients { if _client.ClientId == client.ClientId { admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) break } } return nil } func (admin *Admin) SetSessionType(clientId string, sessionType string) { admin.mutex.Lock() defer admin.mutex.Unlock() for _, client := range admin.clients { if client.ClientId == clientId { client.SessionType = sessionType break } } }