package matchmaker import ( _ "embed" "fmt" "git.wamblee.org/converge/pkg/comms" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/server/admin" iowrappers2 "git.wamblee.org/converge/pkg/support/ioutils" "io" "log" "time" ) // Use a fixed host key for all agents. Using a dynamic host key would be madness. // An alternative would be to configure the host key on the server side and send it // to agents before establishing a session. // //go:embed hostkey.pem var hostPrivateKey []byte type MatchMaker struct { admin admin.Admin notifier Notifier } func NewMatchMaker(notifier Notifier) *MatchMaker { converge := MatchMaker{ admin: *admin.NewAdmin(), notifier: notifier, } converge.logStatus() return &converge } func (converge *MatchMaker) Close() { converge.admin.Close() } type WaitForAgentFunc func() func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadWriteCloser) (waitForAgentFunc WaitForAgentFunc, err error) { serverInfo := comms.ServerInfo{} agentInfo, err := comms.ServerInitialization(conn, serverInfo) if err != nil { return nil, err } agent, err := converge.admin.AddAgent(hostPrivateKey, publicId, agentInfo, conn) converge.logStatus() if err != nil { return nil, err } publicId = agent.Info.PublicId cleanupFunc := func() { _ = converge.admin.RemoveAgent(publicId) converge.logStatus() } defer func() { if err != nil { cleanupFunc() } }() go func() { comms.ListenForAgentEvents(agent.CommChannel.SideChannel, func(info comms.EnvironmentInfo) { agent.Info.EnvironmentInfo = info converge.logStatus() }, func(session comms.SessionInfo) { log.Println("Recceived sessioninfo ", session) converge.admin.SetSessionType(models.ClientId(session.ClientId), models.SessionType(session.SessionType)) }, func(expiry comms.ExpiryTimeUpdate) { agent.Info.SetExpiryTime(expiry.ExpiryTime) converge.logStatus() }, func(heartbeat comms.HeartBeat) { // Empty }) }() return WaitForAgentFunc(func() { defer cleanupFunc() log.Printf("agentConnection registered: '%s'\n", publicId) for !agent.CommChannel.Session.IsClosed() { time.Sleep(250 * time.Millisecond) } log.Printf("Agent disconnected") }), nil } type SynchronizeStreamsFunc func() func (converge *MatchMaker) Connect(wsProxyMode bool, publicId models.RendezVousId, conn iowrappers2.ReadWriteAddrCloser) (clientId models.ClientId, synchronizer SynchronizeStreamsFunc, err error) { defer func() { if err != nil { conn.Close() } }() log.Printf("Using wsproxy protocol %v", wsProxyMode) channel := comms.NewGOBChannel(conn) if wsProxyMode { err := comms.SendWithTimeout( channel, comms.ProtocolVersion{ Version: comms.PROTOCOL_VERSION, }) if err != nil { log.Printf("Error sending protocol version to client %v", err) return "", nil, err } } client, err := converge.admin.AddClient(publicId, conn) cleanUpFunc := func() { if client != nil { converge.admin.RemoveClient(client.Info.ClientId) } converge.logStatus() } defer func() { if err != nil { cleanUpFunc() } }() if err != nil { if wsProxyMode { _ = comms.SendWithTimeout(channel, comms.ClientConnectionInfo{ Ok: false, Message: err.Error(), }) } return "", nil, err } log.Printf("Connecting client and agent: '%s'\n", publicId) if wsProxyMode { err = comms.SendWithTimeout(channel, comms.ClientConnectionInfo{ Ok: true, Message: "Connecting to agent", }) if err != nil { return "", nil, fmt.Errorf("Error sending connection info to client: %v", err) } clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel) if err != nil { return "", nil, fmt.Errorf("Error receiving environment info from client: %v", err) } client.Info.EnvironmentInfo = clientEnvironment } converge.logStatus() return client.Info.ClientId, SynchronizeStreamsFunc(func() { defer conn.Close() defer cleanUpFunc() client.Synchronize() }), nil } func (converge *MatchMaker) logStatus() { notification := converge.admin.CreateNotifification() logStatusImpl(notification, converge.notifier) } // logging without having the lock on the administration. func logStatusImpl(admin *models.State, notifier Notifier) { format := "%-20s %-20s %-20s %-10s %-15s %-20s" lines := make([]string, 0, 100) lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", "USER", "HOST", "OS")) for _, agent := range admin.Agents { lines = append(lines, fmt.Sprintf(format, agent.PublicId, agent.StartTime.Format(time.DateTime), agent.GetExpiryTime().Format(time.DateTime), agent.EnvironmentInfo.Username, agent.EnvironmentInfo.Hostname, agent.EnvironmentInfo.OS)) } lines = append(lines, "") format = "%-10s %-20s %-20s %-20s %-20s" lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")) for _, client := range admin.Clients { lines = append(lines, fmt.Sprintf(format, client.ClientId, client.PublicId, client.StartTime.Format(time.DateTime), client.RemoteAddr, client.SessionType)) } lines = append(lines, "") for _, line := range lines { log.Println(line) } notifier.Publish(admin) }