package admin import ( "fmt" "git.wamblee.org/converge/pkg/comms" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/support/concurrency" iowrappers2 "git.wamblee.org/converge/pkg/support/iowrappers" "io" "log" "math/rand" "net" "strconv" "sync" "time" ) type agentConnection struct { Info *models.Agent // server session CommChannel comms.CommChannel } var clientIdGenerator = concurrency.NewAtomicCounter() type clientConnection struct { Info *models.Client agentConnection net.Conn clientConnection iowrappers2.ReadWriteAddrCloser } func newAgent(commChannel comms.CommChannel, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo) *agentConnection { agent := models.Agent{ Guid: models.AgentGuid(strconv.Itoa(rand.Int())), RemoteAddr: models.RemoteAddr(commChannel.Session.RemoteAddr().String()), PublicId: publicId, StartTime: time.Now(), EnvironmentInfo: agentInfo, } return &agentConnection{ Info: &agent, CommChannel: commChannel, } } func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser, agentConn net.Conn, agentGuid models.AgentGuid) *clientConnection { client := models.Client{ Guid: models.ClientGuid(strconv.Itoa(rand.Int())), RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()), PublicId: publicId, AgentGuid: agentGuid, ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())), StartTime: time.Now(), } return &clientConnection{ Info: &client, agentConnection: agentConn, clientConnection: clientConn, } } func (match *clientConnection) Synchronize() { iowrappers2.SynchronizeStreams("client -- agent", match.clientConnection, match.agentConnection) } type Admin struct { // map of public id to agent mutex sync.Mutex // for reporting state to webclients and prometheus and also used for // logging the state. This uses copy-on-write. Every time an agent or // clinet is added or removed a copy is made. state *models.State // TODO: use linked map for both of these agents map[models.RendezVousId]*agentConnection clients map[models.ClientId]*clientConnection } func NewAdmin() *Admin { return &Admin{ mutex: sync.Mutex{}, state: models.NewState(), agents: make(map[models.RendezVousId]*agentConnection), clients: make(map[models.ClientId]*clientConnection), } } func (admin *Admin) Close() { for _, client := range admin.clients { client.clientConnection.Close() client.agentConnection.Close() } for _, agent := range admin.agents { agent.CommChannel.Session.Close() } } func (admin *Admin) CreateNotifification() *models.State { admin.mutex.Lock() defer admin.mutex.Unlock() return admin.state } func (admin *Admin) getFreeId(publicId models.RendezVousId) (models.RendezVousId, error) { if admin.agents[publicId] == nil { return publicId, nil } for i := range 100 { candidate := models.RendezVousId(string(publicId) + "-" + strconv.Itoa(i)) if admin.agents[candidate] == nil { return candidate, nil } } return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId) } func (admin *Admin) AddAgent(hostKey []byte, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) { admin.mutex.Lock() defer admin.mutex.Unlock() newPublicId, err := admin.getFreeId(publicId) if err == nil { message := "Requested id is accepted" if publicId != newPublicId { message = "The server allocated a new id." } publicId = newPublicId err := comms.SendRegistrationMessage(conn, comms.AgentRegistration{ Ok: true, Message: message, Id: string(publicId), HostPrivateKey: hostKey, }) if err != nil { return nil, err } } else { comms.SendRegistrationMessage(conn, comms.AgentRegistration{ Ok: false, Message: err.Error(), }) return nil, fmt.Errorf( "Agent requested id '%s' which is already taken anc could not allocate a new unique id", publicId) } agentCheck := admin.agents[publicId] if agentCheck != nil { return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId) } commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn) if err != nil { return nil, err } agent := newAgent(commChannel, publicId, agentInfo) admin.state = admin.state.Copy() admin.state.Agents[agent.Info.Guid] = agent.Info admin.agents[publicId] = agent return agent, nil } func (admin *Admin) AddClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser) (*clientConnection, error) { admin.mutex.Lock() defer admin.mutex.Unlock() agent := admin.agents[publicId] if agent == nil { // we should setup on-demend connections ot agents later. return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId) } agentConn, err := admin.getAgentConnection(agent) if err != nil { return nil, err } log.Println("Successful websocket connection to agent") client := newClient(publicId, clientConn, agentConn, agent.Info.Guid) // Before using this connection for SSH we use it to send client metadata to the // agent err = comms.SendClientInfo(agentConn, string(client.Info.ClientId)) if err != nil { return nil, err } admin.state = admin.state.Copy() admin.state.Clients[client.Info.Guid] = client.Info admin.clients[client.Info.ClientId] = client return client, nil } func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) { agentConn, err := agent.CommChannel.Session.Open() count := 0 for err != nil && count < 10 { log.Printf("Retrying connection to agent: %v", err) time.Sleep(250 * time.Millisecond) count++ agentConn, err = agent.CommChannel.Session.Open() } return agentConn, err } func (admin *Admin) RemoveAgent(publicId models.RendezVousId) error { admin.mutex.Lock() defer admin.mutex.Unlock() agent := admin.agents[publicId] if agent == nil { return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) } log.Printf("Removing agent: '%s'", publicId) err := agent.CommChannel.Session.Close() if err != nil { log.Printf("Could not close yamux client session for '%s'\n", publicId) } admin.state = admin.state.Copy() delete(admin.state.Agents, agent.Info.Guid) delete(admin.agents, publicId) return nil } func (admin *Admin) RemoveClient(client *clientConnection) error { admin.mutex.Lock() defer admin.mutex.Unlock() log.Printf("Removing client: '%s' created at %s\n", client.Info.Guid, client.Info.StartTime.Format(time.DateTime)) // try to explicitly close connection to the agent. _ = client.agentConnection.Close() _ = client.clientConnection.Close() admin.state = admin.state.Copy() delete(admin.state.Clients, client.Info.Guid) delete(admin.clients, client.Info.ClientId) return nil } func (admin *Admin) SetSessionType(clientId models.ClientId, sessionType models.SessionType) { admin.mutex.Lock() defer admin.mutex.Unlock() for _, client := range admin.state.Clients { if client.ClientId == clientId { client.SessionType = sessionType break } } }