converge/pkg/server/matchmaker/matchmaker.go

206 lines
5.2 KiB
Go

package matchmaker
import (
_ "embed"
"fmt"
"git.wamblee.org/converge/pkg/comms"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/server/admin"
iowrappers2 "git.wamblee.org/converge/pkg/support/ioutils"
"io"
"log"
"time"
)
// Use a fixed host key for all agents. Using a dynamic host key would be madness.
// An alternative would be to configure the host key on the server side and send it
// to agents before establishing a session.
//
//go:embed hostkey.pem
var hostPrivateKey []byte
type MatchMaker struct {
admin admin.Admin
notifier Notifier
}
func NewMatchMaker(notifier Notifier) *MatchMaker {
converge := MatchMaker{
admin: *admin.NewAdmin(),
notifier: notifier,
}
converge.logStatus()
return &converge
}
func (converge *MatchMaker) Close() {
converge.admin.Close()
}
type WaitForAgentFunc func()
func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadWriteCloser) (waitForAgentFunc WaitForAgentFunc, err error) {
serverInfo := comms.ServerInfo{}
agentInfo, err := comms.ServerInitialization(conn, serverInfo)
if err != nil {
return nil, err
}
agent, err := converge.admin.AddAgent(hostPrivateKey, publicId, agentInfo, conn)
converge.logStatus()
if err != nil {
return nil, err
}
publicId = agent.Info.PublicId
cleanupFunc := func() {
_ = converge.admin.RemoveAgent(publicId)
converge.logStatus()
}
defer func() {
if err != nil {
cleanupFunc()
}
}()
go func() {
comms.ListenForAgentEvents(agent.CommChannel.SideChannel,
func(info comms.EnvironmentInfo) {
agent.Info.EnvironmentInfo = info
converge.logStatus()
},
func(session comms.SessionInfo) {
log.Println("Recceived sessioninfo ", session)
converge.admin.SetSessionType(models.ClientId(session.ClientId), models.SessionType(session.SessionType))
},
func(expiry comms.ExpiryTimeUpdate) {
agent.Info.SetExpiryTime(expiry.ExpiryTime)
converge.logStatus()
},
func(heartbeat comms.HeartBeat) {
// Empty
})
}()
return WaitForAgentFunc(func() {
defer cleanupFunc()
log.Printf("agentConnection registered: '%s'\n", publicId)
for !agent.CommChannel.Session.IsClosed() {
time.Sleep(250 * time.Millisecond)
}
log.Printf("Agent disconnected")
}), nil
}
type SynchronizeStreamsFunc func()
func (converge *MatchMaker) Connect(wsProxyMode bool,
publicId models.RendezVousId, conn iowrappers2.ReadWriteAddrCloser) (clientId models.ClientId, synchronizer SynchronizeStreamsFunc, err error) {
defer func() {
if err != nil {
conn.Close()
}
}()
log.Printf("Using wsproxy protocol %v", wsProxyMode)
channel := comms.NewGOBChannel(conn)
if wsProxyMode {
err := comms.SendWithTimeout(
channel,
comms.ProtocolVersion{
Version: comms.PROTOCOL_VERSION,
})
if err != nil {
log.Printf("Error sending protocol version to client %v", err)
return "", nil, err
}
}
client, err := converge.admin.AddClient(publicId, conn)
cleanUpFunc := func() {
if client != nil {
converge.admin.RemoveClient(client.Info.ClientId)
}
converge.logStatus()
}
defer func() {
if err != nil {
cleanUpFunc()
}
}()
if err != nil {
if wsProxyMode {
_ = comms.SendWithTimeout(channel,
comms.ClientConnectionInfo{
Ok: false,
Message: err.Error(),
})
}
return "", nil, err
}
log.Printf("Connecting client and agent: '%s'\n", publicId)
if wsProxyMode {
err = comms.SendWithTimeout(channel,
comms.ClientConnectionInfo{
Ok: true,
Message: "Connecting to agent",
})
if err != nil {
return "", nil, fmt.Errorf("Error sending connection info to client: %v", err)
}
clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel)
if err != nil {
return "", nil, fmt.Errorf("Error receiving environment info from client: %v", err)
}
client.Info.EnvironmentInfo = clientEnvironment
}
converge.logStatus()
return client.Info.ClientId, SynchronizeStreamsFunc(func() {
defer conn.Close()
defer cleanUpFunc()
client.Synchronize()
}), nil
}
func (converge *MatchMaker) logStatus() {
notification := converge.admin.CreateNotifification()
logStatusImpl(notification, converge.notifier)
}
// logging without having the lock on the administration.
func logStatusImpl(admin *models.State, notifier Notifier) {
format := "%-20s %-20s %-20s %-10s %-15s %-20s"
lines := make([]string, 0, 100)
lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS"))
for _, agent := range admin.Agents {
lines = append(lines, fmt.Sprintf(format, agent.PublicId,
agent.StartTime.Format(time.DateTime),
agent.GetExpiryTime().Format(time.DateTime),
agent.EnvironmentInfo.Username,
agent.EnvironmentInfo.Hostname,
agent.EnvironmentInfo.OS))
}
lines = append(lines, "")
format = "%-10s %-20s %-20s %-20s %-20s"
lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE"))
for _, client := range admin.Clients {
lines = append(lines, fmt.Sprintf(format,
client.ClientId,
client.PublicId,
client.StartTime.Format(time.DateTime),
client.RemoteAddr,
client.SessionType))
}
lines = append(lines, "")
for _, line := range lines {
log.Println(line)
}
notifier.Publish(admin)
}