much more clean handling of concurrency in the converge server by separating the administration from the matchmaking by putting admin in a separate package.

This commit is contained in:
Erik Brakkee 2024-08-11 16:17:11 +02:00
parent f82b21b845
commit 1c92fcb03e
8 changed files with 405 additions and 372 deletions

View File

@ -96,7 +96,7 @@ func main() {
}
notifications := NewStateNotifier()
admin := converge.NewAdmin(notifications)
admin := converge.NewMatchMaker(notifications)
websessions := converge.NewWebSessions(notifications.webNotificationChannel)
// For agents connecting

View File

@ -96,6 +96,7 @@ func main() {
state.Agents = append(state.Agents, agent)
client := models.Client{
Guid: strconv.Itoa(rand.Int()),
RemoteAddr: "10.1.3.3",
PublicId: "c1",
AgentGuid: "12342342",
ClientId: "3",

View File

@ -7,6 +7,7 @@ import (
type Client struct {
Guid string
RemoteAddr string
PublicId string
ClientId string
AgentGuid string

View File

@ -6,5 +6,4 @@ type State struct {
Agents []Agent
Clients []Client
Ascii string
}

237
pkg/server/admin/admin.go Normal file
View File

@ -0,0 +1,237 @@
package admin
import (
"converge/pkg/comms"
"converge/pkg/models"
"converge/pkg/support/concurrency"
iowrappers2 "converge/pkg/support/iowrappers"
"fmt"
"io"
"log"
"math/rand"
"net"
"strconv"
"sync"
"time"
)
type agentConnection struct {
models.Agent
// server session
CommChannel comms.CommChannel
}
var agentIdGenerator = concurrency.NewAtomicCounter()
var clientIdGenerator = concurrency.NewAtomicCounter()
type ClientConnection struct {
models.Client
AgentConnection net.Conn
ClientConnection iowrappers2.ReadWriteAddrCloser
}
func newAgent(commChannel comms.CommChannel, publicId string, agentInfo comms.EnvironmentInfo) *agentConnection {
return &agentConnection{
Agent: models.Agent{
Guid: strconv.Itoa(rand.Int()),
PublicId: publicId,
StartTime: time.Now(),
EnvironmentInfo: agentInfo,
},
CommChannel: commChannel,
}
}
func newClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser,
agentConn net.Conn, agentGuid string) *ClientConnection {
return &ClientConnection{
Client: models.Client{
Guid: strconv.Itoa(rand.Int()),
RemoteAddr: clientConn.RemoteAddr().String(),
PublicId: publicId,
AgentGuid: agentGuid,
ClientId: strconv.Itoa(clientIdGenerator.IncrementAndGet()),
StartTime: time.Now(),
},
AgentConnection: agentConn,
ClientConnection: clientConn,
}
}
type Admin struct {
// map of public id to agent
mutex sync.Mutex
agents map[string]*agentConnection
clients []*ClientConnection
}
func NewAdmin() *Admin {
return &Admin{
mutex: sync.Mutex{},
agents: make(map[string]*agentConnection),
clients: make([]*ClientConnection, 0), // not strictly needed
}
}
func (admin *Admin) CreateNotifification() *models.State {
admin.mutex.Lock()
defer admin.mutex.Unlock()
state := models.State{}
state.Agents = make([]models.Agent, 0, len(admin.agents))
state.Clients = make([]models.Client, 0, len(admin.clients))
for _, agent := range admin.agents {
state.Agents = append(state.Agents, agent.Agent)
}
for _, client := range admin.clients {
state.Clients = append(state.Clients, client.Client)
}
return &state
}
func (admin *Admin) getFreeId(publicId string) (string, error) {
usedIds := make(map[string]bool)
for _, agent := range admin.agents {
usedIds[agent.PublicId] = true
}
if !usedIds[publicId] {
return publicId, nil
}
if usedIds[publicId] {
for i := 0; i < 100; i++ {
candidate := publicId + "-" + strconv.Itoa(i)
if !usedIds[candidate] {
return candidate, nil
}
}
}
return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId)
}
func (admin *Admin) AddAgent(publicId string, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
newPublicId, err := admin.getFreeId(publicId)
if err == nil {
message := "Requested id is accepted"
if publicId != newPublicId {
message = "The server allocated a new id."
}
publicId = newPublicId
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: true,
Message: message,
Id: publicId,
})
} else {
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: false,
Message: err.Error(),
})
}
agent := admin.agents[publicId]
if agent != nil {
return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId)
}
commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
if err != nil {
return nil, err
}
agent = newAgent(commChannel, publicId, agentInfo)
admin.agents[publicId] = agent
return agent, nil
}
func (admin *Admin) AddClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser) (*ClientConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
// we should setup on-demend connections ot agents later.
return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId)
}
agentConn, err := admin.getAgentConnection(agent)
if err != nil {
return nil, err
}
log.Println("Successful websocket connection to agent")
log.Println("Sending connection information to agent")
client := newClient(publicId, clientConn, agentConn, agent.Guid)
// Before using this connection for SSH we use it to send client metadata to the
// agent
err = comms.SendClientInfo(agentConn, comms.ClientInfo{
ClientId: client.ClientId,
})
if err != nil {
return nil, err
}
admin.clients = append(admin.clients, client)
return client, nil
}
func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) {
agentConn, err := agent.CommChannel.Session.Open()
count := 0
for err != nil && count < 10 {
log.Printf("Retrying connection to agent: %v", err)
time.Sleep(250 * time.Millisecond)
count++
agentConn, err = agent.CommChannel.Session.Open()
}
return agentConn, err
}
func (admin *Admin) RemoveAgent(publicId string) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
}
log.Printf("Removing agent: '%s'", publicId)
err := agent.CommChannel.Session.Close()
if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId)
}
delete(admin.agents, publicId)
return nil
}
func (admin *Admin) RemoveClient(client *ClientConnection) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.ClientId,
client.StartTime.Format(time.DateTime))
// try to explicitly close connection to the agent.
_ = client.AgentConnection.Close()
_ = client.ClientConnection.Close()
for i, _client := range admin.clients {
if _client.ClientId == client.ClientId {
admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
break
}
}
return nil
}
func (admin *Admin) SetSessionType(clientId string, sessionType string) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
for _, client := range admin.clients {
if client.ClientId == clientId {
client.SessionType = sessionType
break
}
}
}

View File

@ -1,370 +0,0 @@
package converge
import (
"converge/pkg/comms"
"converge/pkg/models"
"converge/pkg/support/concurrency"
iowrappers2 "converge/pkg/support/iowrappers"
"fmt"
"io"
"log"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)
type AgentConnection struct {
models.Agent
// server session
commChannel comms.CommChannel
}
var agentIdGenerator = concurrency.NewAtomicCounter()
var clientIdGenerator = concurrency.NewAtomicCounter()
type ClientConnection struct {
models.Client
agent net.Conn
client iowrappers2.ReadWriteAddrCloser
}
func NewAgent(commChannel comms.CommChannel, publicId string, agentInfo comms.EnvironmentInfo) *AgentConnection {
return &AgentConnection{
Agent: models.Agent{
Guid: strconv.Itoa(rand.Int()),
PublicId: publicId,
StartTime: time.Now(),
EnvironmentInfo: agentInfo,
},
commChannel: commChannel,
}
}
func NewClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser,
agentConn net.Conn, agentGuid string) *ClientConnection {
return &ClientConnection{
Client: models.Client{
Guid: strconv.Itoa(rand.Int()),
PublicId: publicId,
AgentGuid: agentGuid,
ClientId: strconv.Itoa(clientIdGenerator.IncrementAndGet()),
StartTime: time.Now(),
},
agent: agentConn,
client: clientConn,
}
}
type Admin struct {
// map of public id to agent
mutex sync.Mutex
agents map[string]*AgentConnection
clients []*ClientConnection
notifications Notifier
}
func NewAdmin(notifications Notifier) *Admin {
admin := Admin{
mutex: sync.Mutex{},
agents: make(map[string]*AgentConnection),
clients: make([]*ClientConnection, 0), // not strictly needed
notifications: notifications,
}
admin.logStatus()
return &admin
}
func (admin *Admin) createNotification() *models.State {
state := models.State{}
state.Agents = make([]models.Agent, 0, len(admin.agents))
state.Clients = make([]models.Client, 0, len(admin.clients))
for _, agent := range admin.agents {
state.Agents = append(state.Agents, agent.Agent)
}
for _, client := range admin.clients {
state.Clients = append(state.Clients, client.Client)
}
return &state
}
func (admin *Admin) logStatus() {
format := "%-20s %-20s %-20s %-10s %-15s %-20s"
lines := make([]string, 0, 100)
lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS"))
for _, agent := range admin.agents {
agent.commChannel.Session.RemoteAddr()
lines = append(lines, fmt.Sprintf(format, agent.PublicId,
agent.StartTime.Format(time.DateTime),
agent.ExpiryTime.Format(time.DateTime),
agent.EnvironmentInfo.Username,
agent.EnvironmentInfo.Hostname,
agent.EnvironmentInfo.OS))
}
lines = append(lines, "")
format = "%-10s %-20s %-20s %-20s %-20s"
lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE"))
for _, client := range admin.clients {
lines = append(lines, fmt.Sprintf(format,
client.ClientId,
client.PublicId,
client.StartTime.Format(time.DateTime),
client.client.RemoteAddr(),
client.SessionType))
}
lines = append(lines, "")
for _, line := range lines {
log.Println(line)
}
notification := admin.createNotification()
notification.Ascii = strings.Join(lines, "\n")
admin.notifications.Publish(notification)
}
func (admin *Admin) getFreeId(publicId string) (string, error) {
usedIds := make(map[string]bool)
for _, agent := range admin.agents {
usedIds[agent.PublicId] = true
}
if !usedIds[publicId] {
return publicId, nil
}
if usedIds[publicId] {
for i := 0; i < 100; i++ {
candidate := publicId + "-" + strconv.Itoa(i)
if !usedIds[candidate] {
return candidate, nil
}
}
}
return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId)
}
func (admin *Admin) addAgent(publicId string, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*AgentConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
newPublicId, err := admin.getFreeId(publicId)
if err == nil {
message := "Requested id is accepted"
if publicId != newPublicId {
message = "The server allocated a new id."
}
publicId = newPublicId
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: true,
Message: message,
Id: publicId,
})
} else {
comms.SendRegistrationMessage(conn, comms.AgentRegistration{
Ok: false,
Message: err.Error(),
})
}
agent := admin.agents[publicId]
if agent != nil {
return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId)
}
commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
if err != nil {
return nil, err
}
agent = NewAgent(commChannel, publicId, agentInfo)
admin.agents[publicId] = agent
admin.logStatus()
return agent, nil
}
func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser) (*ClientConnection, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
// we should setup on-demend connections ot agents later.
return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId)
}
agentConn, err := admin.getAgentConnection(agent)
if err != nil {
return nil, err
}
log.Println("Successful websocket connection to agent")
log.Println("Sending connection information to agent")
client := NewClient(publicId, clientConn, agentConn, agent.Guid)
// Before using this connection for SSH we use it to send client metadata to the
// agent
err = comms.SendClientInfo(agentConn, comms.ClientInfo{
ClientId: client.ClientId,
})
if err != nil {
return nil, err
}
admin.clients = append(admin.clients, client)
admin.logStatus()
return client, nil
}
func (admin *Admin) getAgentConnection(agent *AgentConnection) (net.Conn, error) {
agentConn, err := agent.commChannel.Session.Open()
count := 0
for err != nil && count < 10 {
log.Printf("Retrying connection to agent: %v", err)
time.Sleep(250 * time.Millisecond)
count++
agentConn, err = agent.commChannel.Session.Open()
}
return agentConn, err
}
func (admin *Admin) RemoveAgent(publicId string) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
}
log.Printf("Removing agent: '%s'", publicId)
err := agent.commChannel.Session.Close()
if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId)
}
delete(admin.agents, publicId)
admin.logStatus()
return nil
}
func (admin *Admin) RemoveClient(client *ClientConnection) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.ClientId,
client.StartTime.Format(time.DateTime))
// try to explicitly close connection to the agent.
_ = client.agent.Close()
_ = client.client.Close()
for i, _client := range admin.clients {
if _client.ClientId == client.ClientId {
admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
break
}
}
admin.logStatus()
return nil
}
func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error {
serverInfo := comms.ServerInfo{}
agentInfo, err := comms.ServerInitialization(conn, serverInfo)
if err != nil {
return err
}
agent, err := admin.addAgent(publicId, agentInfo, conn)
if err != nil {
return err
}
publicId = agent.PublicId
defer func() {
admin.RemoveAgent(publicId)
}()
go func() {
comms.ListenForAgentEvents(agent.commChannel.SideChannel,
func(info comms.EnvironmentInfo) {
agent.EnvironmentInfo = info
admin.logStatus()
},
func(session comms.SessionInfo) {
log.Println("Recceived sessioninfo ", session)
for _, client := range admin.clients {
// a bit hacky. There should be at most one client that has an unset session
// Very unlikely for multiple sessions to start at the same point in time.
if client.ClientId == session.ClientId {
client.SessionType = session.SessionType
break
}
}
},
func(expiry comms.ExpiryTimeUpdate) {
agent.ExpiryTime = expiry.ExpiryTime
admin.logStatus()
})
}()
go log.Printf("AgentConnection registered: '%s'\n", publicId)
for !agent.commChannel.Session.IsClosed() {
time.Sleep(250 * time.Millisecond)
}
return nil
}
func (admin *Admin) Connect(wsProxyMode bool, publicId string, conn iowrappers2.ReadWriteAddrCloser) error {
defer conn.Close()
log.Printf("Using wsproxy protocol %v", wsProxyMode)
channel := comms.NewGOBChannel(conn)
if wsProxyMode {
err := comms.SendWithTimeout(
channel,
comms.ProtocolVersion{
Version: comms.PROTOCOL_VERSION,
})
if err != nil {
log.Printf("Error sending protocol version to client %v", err)
return err
}
}
client, err := admin.addClient(publicId, conn)
if err != nil {
if wsProxyMode {
_ = comms.SendWithTimeout(channel,
comms.ClientConnectionInfo{
Ok: false,
Message: err.Error(),
})
}
return err
}
defer func() {
admin.RemoveClient(client)
}()
log.Printf("Connecting client and agent: '%s'\n", publicId)
if wsProxyMode {
err = comms.SendWithTimeout(channel,
comms.ClientConnectionInfo{
Ok: true,
Message: "Connecting to agent",
})
if err != nil {
return fmt.Errorf("Error sending connection info to client: %v", err)
}
clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel)
if err != nil {
return fmt.Errorf("Error receiving environment info from client: %v", err)
}
client.EnvironmentInfo = clientEnvironment
admin.logStatus()
}
iowrappers2.SynchronizeStreams("client -- agent", client.client, client.agent)
return nil
}

View File

@ -0,0 +1,164 @@
package converge
import (
"converge/pkg/comms"
"converge/pkg/models"
"converge/pkg/server/admin"
iowrappers2 "converge/pkg/support/iowrappers"
"fmt"
"io"
"log"
"time"
)
type MatchMaker struct {
admin admin.Admin
notifier Notifier
}
func NewMatchMaker(notifier Notifier) *MatchMaker {
converge := MatchMaker{
admin: *admin.NewAdmin(),
notifier: notifier,
}
converge.logStatus()
return &converge
}
func (converge *MatchMaker) Register(publicId string, conn io.ReadWriteCloser) error {
serverInfo := comms.ServerInfo{}
agentInfo, err := comms.ServerInitialization(conn, serverInfo)
if err != nil {
return err
}
agent, err := converge.admin.AddAgent(publicId, agentInfo, conn)
converge.logStatus()
if err != nil {
return err
}
publicId = agent.PublicId
defer func() {
converge.admin.RemoveAgent(publicId)
converge.logStatus()
}()
go func() {
comms.ListenForAgentEvents(agent.CommChannel.SideChannel,
func(info comms.EnvironmentInfo) {
agent.EnvironmentInfo = info
converge.logStatus()
},
func(session comms.SessionInfo) {
log.Println("Recceived sessioninfo ", session)
converge.admin.SetSessionType(session.ClientId, session.SessionType)
},
func(expiry comms.ExpiryTimeUpdate) {
agent.ExpiryTime = expiry.ExpiryTime
converge.logStatus()
})
}()
go log.Printf("agentConnection registered: '%s'\n", publicId)
for !agent.CommChannel.Session.IsClosed() {
time.Sleep(250 * time.Millisecond)
}
return nil
}
func (converge *MatchMaker) Connect(wsProxyMode bool, publicId string, conn iowrappers2.ReadWriteAddrCloser) error {
defer conn.Close()
log.Printf("Using wsproxy protocol %v", wsProxyMode)
channel := comms.NewGOBChannel(conn)
if wsProxyMode {
err := comms.SendWithTimeout(
channel,
comms.ProtocolVersion{
Version: comms.PROTOCOL_VERSION,
})
if err != nil {
log.Printf("Error sending protocol version to client %v", err)
return err
}
}
client, err := converge.admin.AddClient(publicId, conn)
converge.logStatus()
if err != nil {
if wsProxyMode {
_ = comms.SendWithTimeout(channel,
comms.ClientConnectionInfo{
Ok: false,
Message: err.Error(),
})
}
return err
}
defer func() {
converge.admin.RemoveClient(client)
converge.logStatus()
}()
log.Printf("Connecting client and agent: '%s'\n", publicId)
if wsProxyMode {
err = comms.SendWithTimeout(channel,
comms.ClientConnectionInfo{
Ok: true,
Message: "Connecting to agent",
})
if err != nil {
return fmt.Errorf("Error sending connection info to client: %v", err)
}
clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel)
if err != nil {
return fmt.Errorf("Error receiving environment info from client: %v", err)
}
client.EnvironmentInfo = clientEnvironment
converge.logStatus()
}
iowrappers2.SynchronizeStreams("client -- agent", client.ClientConnection, client.AgentConnection)
return nil
}
func (converge *MatchMaker) logStatus() {
notification := converge.admin.CreateNotifification()
logStatusImpl(notification, converge.notifier)
}
// logging without having the lock on the administration.
func logStatusImpl(admin *models.State, notifier Notifier) {
format := "%-20s %-20s %-20s %-10s %-15s %-20s"
lines := make([]string, 0, 100)
lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS"))
for _, agent := range admin.Agents {
lines = append(lines, fmt.Sprintf(format, agent.PublicId,
agent.StartTime.Format(time.DateTime),
agent.ExpiryTime.Format(time.DateTime),
agent.EnvironmentInfo.Username,
agent.EnvironmentInfo.Hostname,
agent.EnvironmentInfo.OS))
}
lines = append(lines, "")
format = "%-10s %-20s %-20s %-20s %-20s"
lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE"))
for _, client := range admin.Clients {
lines = append(lines, fmt.Sprintf(format,
client.ClientId,
client.PublicId,
client.StartTime.Format(time.DateTime),
client.RemoteAddr,
client.SessionType))
}
lines = append(lines, "")
for _, line := range lines {
log.Println(line)
}
notifier.Publish(admin)
}

View File

@ -87,6 +87,7 @@ func (session *WebSession) WriteNotifications(location *time.Location, cancel co
case notification, ok := <-session.notifications:
if !ok {
log.Println("channel closed")
return
}
//log.Println("Got notification: ", notification)
err := templates.State(notification, location).Render(context.Background(), session.conn)