converge/pkg/server/admin/admin.go
Erik Brakkee 7d25f39f5b test for connecting clients and bidirectional communication to agent.
Required lots of rework since the GOBChannel appeared to be reading
ahead of the data it actually needed. Now using more low-level IO
to send the clientId over to the agent instead.
2024-08-22 16:16:02 +02:00

241 lines
6.9 KiB
Go

package admin
import (
"fmt"
"git.wamblee.org/converge/pkg/comms"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/support/concurrency"
iowrappers2 "git.wamblee.org/converge/pkg/support/iowrappers"
"io"
"log"
"math/rand"
"net"
"strconv"
"sync"
"time"
)
type agentConnection struct {
Info *models.Agent
// server session
CommChannel comms.CommChannel
}
var clientIdGenerator = concurrency.NewAtomicCounter()
type clientConnection struct {
Info *models.Client
agentConnection net.Conn
clientConnection iowrappers2.ReadWriteAddrCloser
}
func newAgent(commChannel comms.CommChannel, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo) *agentConnection {
agent := models.Agent{
Guid: models.AgentGuid(strconv.Itoa(rand.Int())),
RemoteAddr: models.RemoteAddr(commChannel.Session.RemoteAddr().String()),
PublicId: publicId,
StartTime: time.Now(),
EnvironmentInfo: agentInfo,
}
return &agentConnection{
Info: &agent,
CommChannel: commChannel,
}
}
func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser,
agentConn net.Conn, agentGuid models.AgentGuid) *clientConnection {
client := models.Client{
Guid: models.ClientGuid(strconv.Itoa(rand.Int())),
RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()),
PublicId: publicId,
AgentGuid: agentGuid,
ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())),
StartTime: time.Now(),
}
return &clientConnection{
Info: &client,
agentConnection: agentConn,
clientConnection: clientConn,
}
}
func (match *clientConnection) Synchronize() {
iowrappers2.SynchronizeStreams("client -- agent", match.clientConnection, match.agentConnection)
}
type Admin struct {
// map of public id to agent
mutex sync.Mutex
// for reporting state to webclients and prometheus and also used for
// logging the state. This uses copy-on-write. Every time an agent or
// clinet is added or removed a copy is made.
state *models.State
// TODO: use linked map for both of these
agents map[models.RendezVousId]*agentConnection
clients map[models.ClientId]*clientConnection
}
func NewAdmin() *Admin {
return &Admin{
mutex: sync.Mutex{},
state: models.NewState(),
agents: make(map[models.RendezVousId]*agentConnection),
clients: make(map[models.ClientId]*clientConnection),
}
}
func (admin *Admin) CreateNotifification() *models.State {
admin.mutex.Lock()
defer admin.mutex.Unlock()
return admin.state
}
func (admin *Admin) getFreeId(publicId models.RendezVousId) (models.RendezVousId, error) {
if admin.agents[publicId] == nil {
return publicId, nil
}
for i := range 100 {
candidate := models.RendezVousId(string(publicId) + "-" + strconv.Itoa(i))
if admin.agents[candidate] == nil {
return candidate, nil
}
}
return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId)
}
func (admin *Admin) AddAgent(hostKey []byte, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
newPublicId, err := admin.getFreeId(publicId)
if err == nil {
message := "Requested id is accepted"
if publicId != newPublicId {
message = "The server allocated a new id."
}
publicId = newPublicId
err := comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: true,
Message: message,
Id: string(publicId),
HostPrivateKey: hostKey,
})
if err != nil {
return nil, err
}
} else {
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: false,
Message: err.Error(),
})
return nil, fmt.Errorf(
"Agent requested id '%s' which is already taken anc could not allocate a new unique id", publicId)
}
agentCheck := admin.agents[publicId]
if agentCheck != nil {
return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId)
}
commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
if err != nil {
return nil, err
}
agent := newAgent(commChannel, publicId, agentInfo)
admin.state = admin.state.Copy()
admin.state.Agents[agent.Info.Guid] = agent.Info
admin.agents[publicId] = agent
return agent, nil
}
func (admin *Admin) AddClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser) (*clientConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
// we should setup on-demend connections ot agents later.
return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId)
}
agentConn, err := admin.getAgentConnection(agent)
if err != nil {
return nil, err
}
log.Println("Successful websocket connection to agent")
client := newClient(publicId, clientConn, agentConn, agent.Info.Guid)
// Before using this connection for SSH we use it to send client metadata to the
// agent
err = comms.SendClientInfo(agentConn, string(client.Info.ClientId))
if err != nil {
return nil, err
}
admin.state = admin.state.Copy()
admin.state.Clients[client.Info.Guid] = client.Info
admin.clients[client.Info.ClientId] = client
return client, nil
}
func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) {
agentConn, err := agent.CommChannel.Session.Open()
count := 0
for err != nil && count < 10 {
log.Printf("Retrying connection to agent: %v", err)
time.Sleep(250 * time.Millisecond)
count++
agentConn, err = agent.CommChannel.Session.Open()
}
return agentConn, err
}
func (admin *Admin) RemoveAgent(publicId models.RendezVousId) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
}
log.Printf("Removing agent: '%s'", publicId)
err := agent.CommChannel.Session.Close()
if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId)
}
admin.state = admin.state.Copy()
delete(admin.state.Agents, agent.Info.Guid)
delete(admin.agents, publicId)
return nil
}
func (admin *Admin) RemoveClient(client *clientConnection) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.Info.Guid,
client.Info.StartTime.Format(time.DateTime))
// try to explicitly close connection to the agent.
_ = client.agentConnection.Close()
_ = client.clientConnection.Close()
admin.state = admin.state.Copy()
delete(admin.state.Clients, client.Info.Guid)
delete(admin.clients, client.Info.ClientId)
return nil
}
func (admin *Admin) SetSessionType(clientId models.ClientId, sessionType models.SessionType) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
for _, client := range admin.state.Clients {
if client.ClientId == clientId {
client.SessionType = sessionType
break
}
}
}