package matchmaker import ( "fmt" "git.wamblee.org/converge/pkg/comms" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/server/admin" iowrappers2 "git.wamblee.org/converge/pkg/support/iowrappers" "io" "log" "time" ) type MatchMaker struct { admin admin.Admin notifier Notifier } func NewMatchMaker(notifier Notifier) *MatchMaker { converge := MatchMaker{ admin: *admin.NewAdmin(), notifier: notifier, } converge.logStatus() return &converge } func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadWriteCloser) error { serverInfo := comms.ServerInfo{} agentInfo, err := comms.ServerInitialization(conn, serverInfo) if err != nil { return err } agent, err := converge.admin.AddAgent(publicId, agentInfo, conn) converge.logStatus() if err != nil { return err } publicId = agent.Info.PublicId defer func() { converge.admin.RemoveAgent(publicId) converge.logStatus() }() go func() { comms.ListenForAgentEvents(agent.CommChannel.SideChannel, func(info comms.EnvironmentInfo) { agent.Info.EnvironmentInfo = info converge.logStatus() }, func(session comms.SessionInfo) { log.Println("Recceived sessioninfo ", session) converge.admin.SetSessionType(models.ClientId(session.ClientId), models.SessionType(session.SessionType)) }, func(expiry comms.ExpiryTimeUpdate) { agent.Info.SetExpiryTime(expiry.ExpiryTime) converge.logStatus() }) }() go log.Printf("agentConnection registered: '%s'\n", publicId) for !agent.CommChannel.Session.IsClosed() { time.Sleep(250 * time.Millisecond) } return nil } func (converge *MatchMaker) Connect(wsProxyMode bool, publicId models.RendezVousId, conn iowrappers2.ReadWriteAddrCloser) error { defer conn.Close() log.Printf("Using wsproxy protocol %v", wsProxyMode) channel := comms.NewGOBChannel(conn) if wsProxyMode { err := comms.SendWithTimeout( channel, comms.ProtocolVersion{ Version: comms.PROTOCOL_VERSION, }) if err != nil { log.Printf("Error sending protocol version to client %v", err) return err } } client, err := converge.admin.AddClient(publicId, conn) if err != nil { if wsProxyMode { _ = comms.SendWithTimeout(channel, comms.ClientConnectionInfo{ Ok: false, Message: err.Error(), }) } return err } defer func() { converge.admin.RemoveClient(client) converge.logStatus() }() log.Printf("Connecting client and agent: '%s'\n", publicId) if wsProxyMode { err = comms.SendWithTimeout(channel, comms.ClientConnectionInfo{ Ok: true, Message: "Connecting to agent", }) if err != nil { return fmt.Errorf("Error sending connection info to client: %v", err) } clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel) if err != nil { return fmt.Errorf("Error receiving environment info from client: %v", err) } client.Info.EnvironmentInfo = clientEnvironment } converge.logStatus() client.Synchronize() return nil } func (converge *MatchMaker) logStatus() { notification := converge.admin.CreateNotifification() logStatusImpl(notification, converge.notifier) } // logging without having the lock on the administration. func logStatusImpl(admin *models.State, notifier Notifier) { format := "%-20s %-20s %-20s %-10s %-15s %-20s" lines := make([]string, 0, 100) lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", "USER", "HOST", "OS")) for _, agent := range admin.Agents { lines = append(lines, fmt.Sprintf(format, agent.PublicId, agent.StartTime.Format(time.DateTime), agent.GetExpiryTime().Format(time.DateTime), agent.EnvironmentInfo.Username, agent.EnvironmentInfo.Hostname, agent.EnvironmentInfo.OS)) } lines = append(lines, "") format = "%-10s %-20s %-20s %-20s %-20s" lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")) for _, client := range admin.Clients { lines = append(lines, fmt.Sprintf(format, client.ClientId, client.PublicId, client.StartTime.Format(time.DateTime), client.RemoteAddr, client.SessionType)) } lines = append(lines, "") for _, line := range lines { log.Println(line) } notifier.Publish(admin) }