251 lines
7.1 KiB
Go
251 lines
7.1 KiB
Go
package admin
|
|
|
|
import (
|
|
"fmt"
|
|
"git.wamblee.org/converge/pkg/comms"
|
|
"git.wamblee.org/converge/pkg/models"
|
|
"git.wamblee.org/converge/pkg/support/concurrency"
|
|
iowrappers2 "git.wamblee.org/converge/pkg/support/iowrappers"
|
|
"io"
|
|
"log"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type agentConnection struct {
|
|
Info *models.Agent
|
|
// server session
|
|
CommChannel comms.CommChannel
|
|
}
|
|
|
|
var clientIdGenerator = concurrency.NewAtomicCounter()
|
|
|
|
type clientConnection struct {
|
|
Info *models.Client
|
|
agentConnection net.Conn
|
|
clientConnection iowrappers2.ReadWriteAddrCloser
|
|
}
|
|
|
|
func newAgent(commChannel comms.CommChannel, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo) *agentConnection {
|
|
agent := models.Agent{
|
|
Guid: models.AgentGuid(strconv.Itoa(rand.Int())),
|
|
RemoteAddr: models.RemoteAddr(commChannel.Session.RemoteAddr().String()),
|
|
PublicId: publicId,
|
|
StartTime: time.Now(),
|
|
EnvironmentInfo: agentInfo,
|
|
}
|
|
return &agentConnection{
|
|
Info: &agent,
|
|
CommChannel: commChannel,
|
|
}
|
|
}
|
|
|
|
func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser,
|
|
agentConn net.Conn, agentGuid models.AgentGuid) *clientConnection {
|
|
client := models.Client{
|
|
Guid: models.ClientGuid(strconv.Itoa(rand.Int())),
|
|
RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()),
|
|
PublicId: publicId,
|
|
AgentGuid: agentGuid,
|
|
ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())),
|
|
StartTime: time.Now(),
|
|
}
|
|
return &clientConnection{
|
|
Info: &client,
|
|
agentConnection: agentConn,
|
|
clientConnection: clientConn,
|
|
}
|
|
}
|
|
|
|
func (match *clientConnection) Synchronize() {
|
|
iowrappers2.SynchronizeStreams("client -- agent", match.clientConnection, match.agentConnection)
|
|
}
|
|
|
|
type Admin struct {
|
|
// map of public id to agent
|
|
mutex sync.Mutex
|
|
// for reporting state to webclients and prometheus and also used for
|
|
// logging the state. This uses copy-on-write. Every time an agent or
|
|
// clinet is added or removed a copy is made.
|
|
state *models.State
|
|
|
|
// TODO: use linked map for both of these
|
|
agents map[models.RendezVousId]*agentConnection
|
|
clients map[models.ClientId]*clientConnection
|
|
}
|
|
|
|
func NewAdmin() *Admin {
|
|
return &Admin{
|
|
mutex: sync.Mutex{},
|
|
state: models.NewState(),
|
|
agents: make(map[models.RendezVousId]*agentConnection),
|
|
clients: make(map[models.ClientId]*clientConnection),
|
|
}
|
|
}
|
|
|
|
func (admin *Admin) Close() {
|
|
for _, client := range admin.clients {
|
|
client.clientConnection.Close()
|
|
client.agentConnection.Close()
|
|
}
|
|
for _, agent := range admin.agents {
|
|
agent.CommChannel.Session.Close()
|
|
}
|
|
}
|
|
|
|
func (admin *Admin) CreateNotifification() *models.State {
|
|
admin.mutex.Lock()
|
|
defer admin.mutex.Unlock()
|
|
return admin.state
|
|
}
|
|
|
|
func (admin *Admin) getFreeId(publicId models.RendezVousId) (models.RendezVousId, error) {
|
|
if admin.agents[publicId] == nil {
|
|
return publicId, nil
|
|
}
|
|
for i := range 100 {
|
|
candidate := models.RendezVousId(string(publicId) + "-" + strconv.Itoa(i))
|
|
if admin.agents[candidate] == nil {
|
|
return candidate, nil
|
|
}
|
|
}
|
|
return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId)
|
|
}
|
|
|
|
func (admin *Admin) AddAgent(hostKey []byte, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) {
|
|
admin.mutex.Lock()
|
|
defer admin.mutex.Unlock()
|
|
|
|
newPublicId, err := admin.getFreeId(publicId)
|
|
if err == nil {
|
|
message := "Requested id is accepted"
|
|
if publicId != newPublicId {
|
|
message = "The server allocated a new id."
|
|
}
|
|
publicId = newPublicId
|
|
err := comms.SendRegistrationMessage(conn, comms.AgentRegistration{
|
|
Ok: true,
|
|
Message: message,
|
|
Id: string(publicId),
|
|
HostPrivateKey: hostKey,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
|
|
Ok: false,
|
|
Message: err.Error(),
|
|
})
|
|
return nil, fmt.Errorf(
|
|
"Agent requested id '%s' which is already taken anc could not allocate a new unique id", publicId)
|
|
}
|
|
agentCheck := admin.agents[publicId]
|
|
if agentCheck != nil {
|
|
return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId)
|
|
}
|
|
|
|
commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
agent := newAgent(commChannel, publicId, agentInfo)
|
|
|
|
admin.state = admin.state.Copy()
|
|
admin.state.Agents[agent.Info.Guid] = agent.Info
|
|
admin.agents[publicId] = agent
|
|
return agent, nil
|
|
}
|
|
|
|
func (admin *Admin) AddClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser) (*clientConnection, error) {
|
|
admin.mutex.Lock()
|
|
defer admin.mutex.Unlock()
|
|
|
|
agent := admin.agents[publicId]
|
|
if agent == nil {
|
|
// we should setup on-demend connections ot agents later.
|
|
return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId)
|
|
}
|
|
|
|
agentConn, err := admin.getAgentConnection(agent)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.Println("Successful websocket connection to agent")
|
|
client := newClient(publicId, clientConn, agentConn, agent.Info.Guid)
|
|
|
|
// Before using this connection for SSH we use it to send client metadata to the
|
|
// agent
|
|
err = comms.SendClientInfo(agentConn, string(client.Info.ClientId))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
admin.state = admin.state.Copy()
|
|
admin.state.Clients[client.Info.Guid] = client.Info
|
|
admin.clients[client.Info.ClientId] = client
|
|
return client, nil
|
|
}
|
|
|
|
func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) {
|
|
agentConn, err := agent.CommChannel.Session.Open()
|
|
count := 0
|
|
for err != nil && count < 10 {
|
|
log.Printf("Retrying connection to agent: %v", err)
|
|
time.Sleep(250 * time.Millisecond)
|
|
count++
|
|
agentConn, err = agent.CommChannel.Session.Open()
|
|
}
|
|
return agentConn, err
|
|
}
|
|
|
|
func (admin *Admin) RemoveAgent(publicId models.RendezVousId) error {
|
|
admin.mutex.Lock()
|
|
defer admin.mutex.Unlock()
|
|
|
|
agent := admin.agents[publicId]
|
|
if agent == nil {
|
|
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
|
|
}
|
|
log.Printf("Removing agent: '%s'", publicId)
|
|
err := agent.CommChannel.Session.Close()
|
|
if err != nil {
|
|
log.Printf("Could not close yamux client session for '%s'\n", publicId)
|
|
}
|
|
admin.state = admin.state.Copy()
|
|
delete(admin.state.Agents, agent.Info.Guid)
|
|
delete(admin.agents, publicId)
|
|
return nil
|
|
}
|
|
|
|
func (admin *Admin) RemoveClient(client *clientConnection) error {
|
|
admin.mutex.Lock()
|
|
defer admin.mutex.Unlock()
|
|
|
|
log.Printf("Removing client: '%s' created at %s\n", client.Info.Guid,
|
|
client.Info.StartTime.Format(time.DateTime))
|
|
// try to explicitly close connection to the agent.
|
|
_ = client.agentConnection.Close()
|
|
_ = client.clientConnection.Close()
|
|
|
|
admin.state = admin.state.Copy()
|
|
delete(admin.state.Clients, client.Info.Guid)
|
|
delete(admin.clients, client.Info.ClientId)
|
|
return nil
|
|
}
|
|
|
|
func (admin *Admin) SetSessionType(clientId models.ClientId, sessionType models.SessionType) {
|
|
admin.mutex.Lock()
|
|
defer admin.mutex.Unlock()
|
|
for _, client := range admin.state.Clients {
|
|
if client.ClientId == clientId {
|
|
client.SessionType = sessionType
|
|
break
|
|
}
|
|
}
|
|
}
|