Separated out the metadata about the agents and clients from the agentConnection and clientConnection objects. Now, the state does not need to be copied anymore when sending notifications.

The matchmaker uses copy on write every time a new agent or client connects or disconnects.
This commit is contained in:
Erik Brakkee 2024-08-15 22:08:24 +02:00
parent 56ad9fbf03
commit cdfe7c2a47
3 changed files with 67 additions and 50 deletions

View File

@ -66,3 +66,15 @@ func NewState() *State {
Clients: collections.NewLinkedMap[ClientGuid, *Client](), Clients: collections.NewLinkedMap[ClientGuid, *Client](),
} }
} }
// for copy on write
func (state *State) Copy() *State {
res := NewState()
for entry := range state.Agents.RangeEntries() {
res.Agents.Put(entry.Key, entry.Value)
}
for entry := range state.Clients.RangeEntries() {
res.Clients.Put(entry.Key, entry.Value)
}
return res
}

View File

@ -16,7 +16,7 @@ import (
) )
type agentConnection struct { type agentConnection struct {
models.Agent Info *models.Agent
// server session // server session
CommChannel comms.CommChannel CommChannel comms.CommChannel
} }
@ -25,35 +25,37 @@ var agentIdGenerator = concurrency.NewAtomicCounter()
var clientIdGenerator = concurrency.NewAtomicCounter() var clientIdGenerator = concurrency.NewAtomicCounter()
type ClientConnection struct { type ClientConnection struct {
models.Client Info *models.Client
agentConnection net.Conn agentConnection net.Conn
clientConnection iowrappers2.ReadWriteAddrCloser clientConnection iowrappers2.ReadWriteAddrCloser
} }
func newAgent(commChannel comms.CommChannel, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo) *agentConnection { func newAgent(commChannel comms.CommChannel, publicId models.RendezVousId, agentInfo comms.EnvironmentInfo) *agentConnection {
return &agentConnection{ agent := models.Agent{
Agent: models.Agent{
Guid: models.AgentGuid(strconv.Itoa(rand.Int())), Guid: models.AgentGuid(strconv.Itoa(rand.Int())),
RemoteAddr: models.RemoteAddr(commChannel.Session.RemoteAddr().String()), RemoteAddr: models.RemoteAddr(commChannel.Session.RemoteAddr().String()),
PublicId: publicId, PublicId: publicId,
StartTime: time.Now(), StartTime: time.Now(),
EnvironmentInfo: agentInfo, EnvironmentInfo: agentInfo,
}, }
return &agentConnection{
Info: &agent,
CommChannel: commChannel, CommChannel: commChannel,
} }
} }
func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser, func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAddrCloser,
agentConn net.Conn, agentGuid models.AgentGuid) *ClientConnection { agentConn net.Conn, agentGuid models.AgentGuid) *ClientConnection {
return &ClientConnection{ client := models.Client{
Client: models.Client{
Guid: models.ClientGuid(strconv.Itoa(rand.Int())), Guid: models.ClientGuid(strconv.Itoa(rand.Int())),
RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()), RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()),
PublicId: publicId, PublicId: publicId,
AgentGuid: agentGuid, AgentGuid: agentGuid,
ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())), ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())),
StartTime: time.Now(), StartTime: time.Now(),
}, }
return &ClientConnection{
Info: &client,
agentConnection: agentConn, agentConnection: agentConn,
clientConnection: clientConn, clientConnection: clientConn,
} }
@ -66,6 +68,12 @@ func (match *ClientConnection) Synchronize() {
type Admin struct { type Admin struct {
// map of public id to agent // map of public id to agent
mutex sync.Mutex mutex sync.Mutex
// for reporting state to webclients and prometheus and also used for
// logging the state. This uses copy-on-write. Every time an agent or
// clinet is added or removed a copy is made.
state *models.State
// TODO: use linked map for both of these
agents map[models.RendezVousId]*agentConnection agents map[models.RendezVousId]*agentConnection
clients []*ClientConnection clients []*ClientConnection
} }
@ -73,6 +81,7 @@ type Admin struct {
func NewAdmin() *Admin { func NewAdmin() *Admin {
return &Admin{ return &Admin{
mutex: sync.Mutex{}, mutex: sync.Mutex{},
state: models.NewState(),
agents: make(map[models.RendezVousId]*agentConnection), agents: make(map[models.RendezVousId]*agentConnection),
clients: make([]*ClientConnection, 0), // not strictly needed clients: make([]*ClientConnection, 0), // not strictly needed
} }
@ -81,30 +90,17 @@ func NewAdmin() *Admin {
func (admin *Admin) CreateNotifification() *models.State { func (admin *Admin) CreateNotifification() *models.State {
admin.mutex.Lock() admin.mutex.Lock()
defer admin.mutex.Unlock() defer admin.mutex.Unlock()
state := models.NewState() return admin.state
for _, agent := range admin.agents {
state.Agents.Put(agent.Guid, &agent.Agent)
}
for _, client := range admin.clients {
state.Clients.Put(client.Guid, &client.Client)
}
return state
} }
func (admin *Admin) getFreeId(publicId models.RendezVousId) (models.RendezVousId, error) { func (admin *Admin) getFreeId(publicId models.RendezVousId) (models.RendezVousId, error) {
usedIds := make(map[models.RendezVousId]bool) if admin.agents[publicId] == nil {
for _, agent := range admin.agents {
usedIds[agent.PublicId] = true
}
if !usedIds[publicId] {
return publicId, nil return publicId, nil
} }
if usedIds[publicId] {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
candidate := string(publicId) + "-" + strconv.Itoa(i) candidate := models.RendezVousId(string(publicId) + "-" + strconv.Itoa(i))
if !usedIds[models.RendezVousId(candidate)] { if admin.agents[candidate] == nil {
return models.RendezVousId(candidate), nil return candidate, nil
}
} }
} }
return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId) return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId)
@ -132,8 +128,8 @@ func (admin *Admin) AddAgent(publicId models.RendezVousId, agentInfo comms.Envir
Message: err.Error(), Message: err.Error(),
}) })
} }
agent := admin.agents[publicId] agentCheck := admin.agents[publicId]
if agent != nil { if agentCheck != nil {
return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId) return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId)
} }
@ -142,7 +138,10 @@ func (admin *Admin) AddAgent(publicId models.RendezVousId, agentInfo comms.Envir
if err != nil { if err != nil {
return nil, err return nil, err
} }
agent = newAgent(commChannel, publicId, agentInfo) agent := newAgent(commChannel, publicId, agentInfo)
admin.state = admin.state.Copy()
admin.state.Agents.Put(agent.Info.Guid, agent.Info)
admin.agents[publicId] = agent admin.agents[publicId] = agent
return agent, nil return agent, nil
} }
@ -165,17 +164,19 @@ func (admin *Admin) AddClient(publicId models.RendezVousId, clientConn iowrapper
log.Println("Sending connection information to agent") log.Println("Sending connection information to agent")
client := newClient(publicId, clientConn, agentConn, agent.Guid) client := newClient(publicId, clientConn, agentConn, agent.Info.Guid)
// Before using this connection for SSH we use it to send client metadata to the // Before using this connection for SSH we use it to send client metadata to the
// agent // agent
err = comms.SendClientInfo(agentConn, comms.ClientInfo{ err = comms.SendClientInfo(agentConn, comms.ClientInfo{
ClientId: string(client.ClientId), ClientId: string(client.Info.ClientId),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
admin.state = admin.state.Copy()
admin.state.Clients.Put(client.Info.Guid, client.Info)
admin.clients = append(admin.clients, client) admin.clients = append(admin.clients, client)
return client, nil return client, nil
} }
@ -205,6 +206,8 @@ func (admin *Admin) RemoveAgent(publicId models.RendezVousId) error {
if err != nil { if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId) log.Printf("Could not close yamux client session for '%s'\n", publicId)
} }
admin.state = admin.state.Copy()
admin.state.Agents.Delete(agent.Info.Guid)
delete(admin.agents, publicId) delete(admin.agents, publicId)
return nil return nil
} }
@ -213,14 +216,16 @@ func (admin *Admin) RemoveClient(client *ClientConnection) error {
admin.mutex.Lock() admin.mutex.Lock()
defer admin.mutex.Unlock() defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.ClientId, log.Printf("Removing client: '%s' created at %s\n", client.Info.Guid,
client.StartTime.Format(time.DateTime)) client.Info.StartTime.Format(time.DateTime))
// try to explicitly close connection to the agent. // try to explicitly close connection to the agent.
_ = client.agentConnection.Close() _ = client.agentConnection.Close()
_ = client.clientConnection.Close() _ = client.clientConnection.Close()
for i, _client := range admin.clients { for i, _client := range admin.clients {
if _client.ClientId == client.ClientId { if _client.Info.ClientId == client.Info.ClientId {
admin.state = admin.state.Copy()
admin.state.Clients.Delete(client.Info.Guid)
admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
break break
} }
@ -231,7 +236,7 @@ func (admin *Admin) RemoveClient(client *ClientConnection) error {
func (admin *Admin) SetSessionType(clientId models.ClientId, sessionType models.SessionType) { func (admin *Admin) SetSessionType(clientId models.ClientId, sessionType models.SessionType) {
admin.mutex.Lock() admin.mutex.Lock()
defer admin.mutex.Unlock() defer admin.mutex.Unlock()
for _, client := range admin.clients { for client := range admin.state.Clients.RangeValues() {
if client.ClientId == clientId { if client.ClientId == clientId {
client.SessionType = sessionType client.SessionType = sessionType
break break

View File

@ -39,7 +39,7 @@ func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadW
if err != nil { if err != nil {
return err return err
} }
publicId = agent.PublicId publicId = agent.Info.PublicId
defer func() { defer func() {
converge.admin.RemoveAgent(publicId) converge.admin.RemoveAgent(publicId)
converge.logStatus() converge.logStatus()
@ -48,7 +48,7 @@ func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadW
go func() { go func() {
comms.ListenForAgentEvents(agent.CommChannel.SideChannel, comms.ListenForAgentEvents(agent.CommChannel.SideChannel,
func(info comms.EnvironmentInfo) { func(info comms.EnvironmentInfo) {
agent.EnvironmentInfo = info agent.Info.EnvironmentInfo = info
converge.logStatus() converge.logStatus()
}, },
func(session comms.SessionInfo) { func(session comms.SessionInfo) {
@ -56,7 +56,7 @@ func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadW
converge.admin.SetSessionType(models.ClientId(session.ClientId), models.SessionType(session.SessionType)) converge.admin.SetSessionType(models.ClientId(session.ClientId), models.SessionType(session.SessionType))
}, },
func(expiry comms.ExpiryTimeUpdate) { func(expiry comms.ExpiryTimeUpdate) {
agent.SetExpiryTime(expiry.ExpiryTime) agent.Info.SetExpiryTime(expiry.ExpiryTime)
converge.logStatus() converge.logStatus()
}) })
}() }()
@ -114,7 +114,7 @@ func (converge *MatchMaker) Connect(wsProxyMode bool, publicId models.RendezVous
if err != nil { if err != nil {
return fmt.Errorf("Error receiving environment info from client: %v", err) return fmt.Errorf("Error receiving environment info from client: %v", err)
} }
client.EnvironmentInfo = clientEnvironment client.Info.EnvironmentInfo = clientEnvironment
} }
converge.logStatus() converge.logStatus()
client.Synchronize() client.Synchronize()