converge/pkg/server/admin/admin.go
Erik Brakkee fc7977f7bb now using maps of Guid to Agent/Client in the state, working towards the definitive solution.
Using LinkedMap that preserves insertion order for the implementation and also added unit tests for that.
2024-09-08 11:16:49 +02:00

241 lines
6.5 KiB
Go

package admin
import (
"fmt"
"git.wamblee.org/converge/pkg/comms"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/support/concurrency"
iowrappers2 "git.wamblee.org/converge/pkg/support/iowrappers"
"io"
"log"
"math/rand"
"net"
"strconv"
"sync"
"time"
)
type agentConnection struct {
models.Agent
// server session
CommChannel comms.CommChannel
}
var agentIdGenerator = concurrency.NewAtomicCounter()
var clientIdGenerator = concurrency.NewAtomicCounter()
type ClientConnection struct {
models.Client
agentConnection net.Conn
clientConnection iowrappers2.ReadWriteAddrCloser
}
func newAgent(commChannel comms.CommChannel, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo) *agentConnection {
return &agentConnection{
Agent: models.Agent{
Guid: models.AgentGuid(strconv.Itoa(rand.Int())),
RemoteAddr: models.RemoteAddr(commChannel.Session.RemoteAddr().String()),
PublicId: publicId,
StartTime: time.Now(),
EnvironmentInfo: agentInfo,
},
CommChannel: commChannel,
}
}
func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser,
agentConn net.Conn, agentGuid models.AgentGuid) *ClientConnection {
return &ClientConnection{
Client: models.Client{
Guid: models.ClientGuid(strconv.Itoa(rand.Int())),
RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()),
PublicId: publicId,
AgentGuid: agentGuid,
ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())),
StartTime: time.Now(),
},
agentConnection: agentConn,
clientConnection: clientConn,
}
}
func (match *ClientConnection) Synchronize() {
iowrappers2.SynchronizeStreams("client -- agent", match.clientConnection, match.agentConnection)
}
type Admin struct {
// map of public id to agent
mutex sync.Mutex
agents map[models.RendezVousId]*agentConnection
clients []*ClientConnection
}
func NewAdmin() *Admin {
return &Admin{
mutex: sync.Mutex{},
agents: make(map[models.RendezVousId]*agentConnection),
clients: make([]*ClientConnection, 0), // not strictly needed
}
}
func (admin *Admin) CreateNotifification() *models.State {
admin.mutex.Lock()
defer admin.mutex.Unlock()
state := models.NewState()
for _, agent := range admin.agents {
state.Agents.Put(agent.Guid, &agent.Agent)
}
for _, client := range admin.clients {
state.Clients.Put(client.Guid, &client.Client)
}
return state
}
func (admin *Admin) getFreeId(publicId models.RendezVousId) (models.RendezVousId, error) {
usedIds := make(map[models.RendezVousId]bool)
for _, agent := range admin.agents {
usedIds[agent.PublicId] = true
}
if !usedIds[publicId] {
return publicId, nil
}
if usedIds[publicId] {
for i := 0; i < 100; i++ {
candidate := string(publicId) + "-" + strconv.Itoa(i)
if !usedIds[models.RendezVousId(candidate)] {
return models.RendezVousId(candidate), nil
}
}
}
return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId)
}
func (admin *Admin) AddAgent(publicId models.RendezVousId, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
newPublicId, err := admin.getFreeId(publicId)
if err == nil {
message := "Requested id is accepted"
if publicId != newPublicId {
message = "The server allocated a new id."
}
publicId = newPublicId
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: true,
Message: message,
Id: string(publicId),
})
} else {
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: false,
Message: err.Error(),
})
}
agent := admin.agents[publicId]
if agent != nil {
return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId)
}
commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
if err != nil {
return nil, err
}
agent = newAgent(commChannel, publicId, agentInfo)
admin.agents[publicId] = agent
return agent, nil
}
func (admin *Admin) AddClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser) (*ClientConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
// we should setup on-demend connections ot agents later.
return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId)
}
agentConn, err := admin.getAgentConnection(agent)
if err != nil {
return nil, err
}
log.Println("Successful websocket connection to agent")
log.Println("Sending connection information to agent")
client := newClient(publicId, clientConn, agentConn, agent.Guid)
// Before using this connection for SSH we use it to send client metadata to the
// agent
err = comms.SendClientInfo(agentConn, comms.ClientInfo{
ClientId: string(client.ClientId),
})
if err != nil {
return nil, err
}
admin.clients = append(admin.clients, client)
return client, nil
}
func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) {
agentConn, err := agent.CommChannel.Session.Open()
count := 0
for err != nil && count < 10 {
log.Printf("Retrying connection to agent: %v", err)
time.Sleep(250 * time.Millisecond)
count++
agentConn, err = agent.CommChannel.Session.Open()
}
return agentConn, err
}
func (admin *Admin) RemoveAgent(publicId models.RendezVousId) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
}
log.Printf("Removing agent: '%s'", publicId)
err := agent.CommChannel.Session.Close()
if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId)
}
delete(admin.agents, publicId)
return nil
}
func (admin *Admin) RemoveClient(client *ClientConnection) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.ClientId,
client.StartTime.Format(time.DateTime))
// try to explicitly close connection to the agent.
_ = client.agentConnection.Close()
_ = client.clientConnection.Close()
for i, _client := range admin.clients {
if _client.ClientId == client.ClientId {
admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
break
}
}
return nil
}
func (admin *Admin) SetSessionType(clientId models.ClientId, sessionType models.SessionType) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
for _, client := range admin.clients {
if client.ClientId == clientId {
client.SessionType = sessionType
break
}
}
}