From 69a8a1aad3f6146bad9eac9baacc9b33ee33dd69 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 11 Aug 2024 16:17:11 +0200 Subject: [PATCH] much more clean handling of concurrency in the converge server by separating the administration from the matchmaking by putting admin in a separate package. --- cmd/converge/converge.go | 2 +- cmd/templaterender/render.go | 1 + pkg/models/client.go | 1 + pkg/models/state.go | 1 - pkg/server/admin/admin.go | 237 ++++++++++++++++++ pkg/server/converge/admin.go | 370 ----------------------------- pkg/server/converge/matchmaker.go | 164 +++++++++++++ pkg/server/converge/websessions.go | 1 + 8 files changed, 405 insertions(+), 372 deletions(-) create mode 100644 pkg/server/admin/admin.go delete mode 100644 pkg/server/converge/admin.go create mode 100644 pkg/server/converge/matchmaker.go diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go index 9000096..ae3c343 100644 --- a/cmd/converge/converge.go +++ b/cmd/converge/converge.go @@ -96,7 +96,7 @@ func main() { } notifications := NewStateNotifier() - admin := converge.NewAdmin(notifications) + admin := converge.NewMatchMaker(notifications) websessions := converge.NewWebSessions(notifications.webNotificationChannel) // For agents connecting diff --git a/cmd/templaterender/render.go b/cmd/templaterender/render.go index 0a9d84b..9e9cd38 100644 --- a/cmd/templaterender/render.go +++ b/cmd/templaterender/render.go @@ -96,6 +96,7 @@ func main() { state.Agents = append(state.Agents, agent) client := models.Client{ Guid: strconv.Itoa(rand.Int()), + RemoteAddr: "10.1.3.3", PublicId: "c1", AgentGuid: "12342342", ClientId: "3", diff --git a/pkg/models/client.go b/pkg/models/client.go index bbd72c7..504aacc 100644 --- a/pkg/models/client.go +++ b/pkg/models/client.go @@ -7,6 +7,7 @@ import ( type Client struct { Guid string + RemoteAddr string PublicId string ClientId string AgentGuid string diff --git a/pkg/models/state.go b/pkg/models/state.go index 529bf28..e407dca 100644 --- a/pkg/models/state.go +++ b/pkg/models/state.go @@ -6,5 +6,4 @@ type State struct { Agents []Agent Clients []Client - Ascii string } diff --git a/pkg/server/admin/admin.go b/pkg/server/admin/admin.go new file mode 100644 index 0000000..7e0f621 --- /dev/null +++ b/pkg/server/admin/admin.go @@ -0,0 +1,237 @@ +package admin + +import ( + "converge/pkg/comms" + "converge/pkg/models" + "converge/pkg/support/concurrency" + iowrappers2 "converge/pkg/support/iowrappers" + "fmt" + "io" + "log" + "math/rand" + "net" + "strconv" + "sync" + "time" +) + +type agentConnection struct { + models.Agent + // server session + CommChannel comms.CommChannel +} + +var agentIdGenerator = concurrency.NewAtomicCounter() +var clientIdGenerator = concurrency.NewAtomicCounter() + +type ClientConnection struct { + models.Client + AgentConnection net.Conn + ClientConnection iowrappers2.ReadWriteAddrCloser +} + +func newAgent(commChannel comms.CommChannel, publicId string, agentInfo comms.EnvironmentInfo) *agentConnection { + return &agentConnection{ + Agent: models.Agent{ + Guid: strconv.Itoa(rand.Int()), + PublicId: publicId, + StartTime: time.Now(), + EnvironmentInfo: agentInfo, + }, + CommChannel: commChannel, + } +} + +func newClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser, + agentConn net.Conn, agentGuid string) *ClientConnection { + return &ClientConnection{ + Client: models.Client{ + Guid: strconv.Itoa(rand.Int()), + RemoteAddr: clientConn.RemoteAddr().String(), + PublicId: publicId, + AgentGuid: agentGuid, + ClientId: strconv.Itoa(clientIdGenerator.IncrementAndGet()), + StartTime: time.Now(), + }, + AgentConnection: agentConn, + ClientConnection: clientConn, + } +} + +type Admin struct { + // map of public id to agent + mutex sync.Mutex + agents map[string]*agentConnection + clients []*ClientConnection +} + +func NewAdmin() *Admin { + return &Admin{ + mutex: sync.Mutex{}, + agents: make(map[string]*agentConnection), + clients: make([]*ClientConnection, 0), // not strictly needed + } +} + +func (admin *Admin) CreateNotifification() *models.State { + admin.mutex.Lock() + defer admin.mutex.Unlock() + state := models.State{} + state.Agents = make([]models.Agent, 0, len(admin.agents)) + state.Clients = make([]models.Client, 0, len(admin.clients)) + for _, agent := range admin.agents { + state.Agents = append(state.Agents, agent.Agent) + } + for _, client := range admin.clients { + state.Clients = append(state.Clients, client.Client) + } + return &state +} + +func (admin *Admin) getFreeId(publicId string) (string, error) { + usedIds := make(map[string]bool) + for _, agent := range admin.agents { + usedIds[agent.PublicId] = true + } + if !usedIds[publicId] { + return publicId, nil + } + if usedIds[publicId] { + for i := 0; i < 100; i++ { + candidate := publicId + "-" + strconv.Itoa(i) + if !usedIds[candidate] { + return candidate, nil + } + } + } + return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId) +} + +func (admin *Admin) AddAgent(publicId string, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*agentConnection, error) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + newPublicId, err := admin.getFreeId(publicId) + if err == nil { + message := "Requested id is accepted" + if publicId != newPublicId { + message = "The server allocated a new id." + } + publicId = newPublicId + comms.SendRegistrationMessage(conn, comms.AgentRegistration{ + Ok: true, + Message: message, + Id: publicId, + }) + } else { + comms.SendRegistrationMessage(conn, comms.AgentRegistration{ + Ok: false, + Message: err.Error(), + }) + } + agent := admin.agents[publicId] + if agent != nil { + return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId) + } + + commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn) + + if err != nil { + return nil, err + } + agent = newAgent(commChannel, publicId, agentInfo) + admin.agents[publicId] = agent + return agent, nil +} + +func (admin *Admin) AddClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser) (*ClientConnection, error) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent == nil { + // we should setup on-demend connections ot agents later. + return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId) + } + + agentConn, err := admin.getAgentConnection(agent) + if err != nil { + return nil, err + } + log.Println("Successful websocket connection to agent") + + log.Println("Sending connection information to agent") + + client := newClient(publicId, clientConn, agentConn, agent.Guid) + + // Before using this connection for SSH we use it to send client metadata to the + // agent + err = comms.SendClientInfo(agentConn, comms.ClientInfo{ + ClientId: client.ClientId, + }) + if err != nil { + return nil, err + } + + admin.clients = append(admin.clients, client) + return client, nil +} + +func (admin *Admin) getAgentConnection(agent *agentConnection) (net.Conn, error) { + agentConn, err := agent.CommChannel.Session.Open() + count := 0 + for err != nil && count < 10 { + log.Printf("Retrying connection to agent: %v", err) + time.Sleep(250 * time.Millisecond) + count++ + agentConn, err = agent.CommChannel.Session.Open() + } + return agentConn, err +} + +func (admin *Admin) RemoveAgent(publicId string) error { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent == nil { + return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) + } + log.Printf("Removing agent: '%s'", publicId) + err := agent.CommChannel.Session.Close() + if err != nil { + log.Printf("Could not close yamux client session for '%s'\n", publicId) + } + delete(admin.agents, publicId) + return nil +} + +func (admin *Admin) RemoveClient(client *ClientConnection) error { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + log.Printf("Removing client: '%s' created at %s\n", client.ClientId, + client.StartTime.Format(time.DateTime)) + // try to explicitly close connection to the agent. + _ = client.AgentConnection.Close() + _ = client.ClientConnection.Close() + + for i, _client := range admin.clients { + if _client.ClientId == client.ClientId { + admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) + break + } + } + return nil +} + +func (admin *Admin) SetSessionType(clientId string, sessionType string) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + for _, client := range admin.clients { + if client.ClientId == clientId { + client.SessionType = sessionType + break + } + } +} diff --git a/pkg/server/converge/admin.go b/pkg/server/converge/admin.go deleted file mode 100644 index 9ea0d76..0000000 --- a/pkg/server/converge/admin.go +++ /dev/null @@ -1,370 +0,0 @@ -package converge - -import ( - "converge/pkg/comms" - "converge/pkg/models" - "converge/pkg/support/concurrency" - iowrappers2 "converge/pkg/support/iowrappers" - "fmt" - "io" - "log" - "math/rand" - "net" - "strconv" - "strings" - "sync" - "time" -) - -type AgentConnection struct { - models.Agent - // server session - commChannel comms.CommChannel -} - -var agentIdGenerator = concurrency.NewAtomicCounter() -var clientIdGenerator = concurrency.NewAtomicCounter() - -type ClientConnection struct { - models.Client - agent net.Conn - client iowrappers2.ReadWriteAddrCloser -} - -func NewAgent(commChannel comms.CommChannel, publicId string, agentInfo comms.EnvironmentInfo) *AgentConnection { - return &AgentConnection{ - Agent: models.Agent{ - Guid: strconv.Itoa(rand.Int()), - PublicId: publicId, - StartTime: time.Now(), - EnvironmentInfo: agentInfo, - }, - commChannel: commChannel, - } -} - -func NewClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser, - agentConn net.Conn, agentGuid string) *ClientConnection { - return &ClientConnection{ - Client: models.Client{ - Guid: strconv.Itoa(rand.Int()), - PublicId: publicId, - AgentGuid: agentGuid, - ClientId: strconv.Itoa(clientIdGenerator.IncrementAndGet()), - StartTime: time.Now(), - }, - agent: agentConn, - client: clientConn, - } -} - -type Admin struct { - // map of public id to agent - mutex sync.Mutex - agents map[string]*AgentConnection - clients []*ClientConnection - notifications Notifier -} - -func NewAdmin(notifications Notifier) *Admin { - admin := Admin{ - mutex: sync.Mutex{}, - agents: make(map[string]*AgentConnection), - clients: make([]*ClientConnection, 0), // not strictly needed - notifications: notifications, - } - admin.logStatus() - return &admin -} - -func (admin *Admin) createNotification() *models.State { - state := models.State{} - state.Agents = make([]models.Agent, 0, len(admin.agents)) - state.Clients = make([]models.Client, 0, len(admin.clients)) - for _, agent := range admin.agents { - state.Agents = append(state.Agents, agent.Agent) - } - for _, client := range admin.clients { - state.Clients = append(state.Clients, client.Client) - } - return &state -} - -func (admin *Admin) logStatus() { - format := "%-20s %-20s %-20s %-10s %-15s %-20s" - - lines := make([]string, 0, 100) - - lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", - "USER", "HOST", "OS")) - for _, agent := range admin.agents { - agent.commChannel.Session.RemoteAddr() - lines = append(lines, fmt.Sprintf(format, agent.PublicId, - agent.StartTime.Format(time.DateTime), - agent.ExpiryTime.Format(time.DateTime), - agent.EnvironmentInfo.Username, - agent.EnvironmentInfo.Hostname, - agent.EnvironmentInfo.OS)) - } - lines = append(lines, "") - format = "%-10s %-20s %-20s %-20s %-20s" - lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")) - for _, client := range admin.clients { - lines = append(lines, fmt.Sprintf(format, - client.ClientId, - client.PublicId, - client.StartTime.Format(time.DateTime), - client.client.RemoteAddr(), - client.SessionType)) - } - lines = append(lines, "") - for _, line := range lines { - log.Println(line) - } - - notification := admin.createNotification() - notification.Ascii = strings.Join(lines, "\n") - admin.notifications.Publish(notification) -} - -func (admin *Admin) getFreeId(publicId string) (string, error) { - usedIds := make(map[string]bool) - for _, agent := range admin.agents { - usedIds[agent.PublicId] = true - } - if !usedIds[publicId] { - return publicId, nil - } - if usedIds[publicId] { - for i := 0; i < 100; i++ { - candidate := publicId + "-" + strconv.Itoa(i) - if !usedIds[candidate] { - return candidate, nil - } - } - } - return "", fmt.Errorf("Could not allocate agent id based on requested public id '%s'", publicId) -} - -func (admin *Admin) addAgent(publicId string, agentInfo comms.EnvironmentInfo, conn io.ReadWriteCloser) (*AgentConnection, error) { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - newPublicId, err := admin.getFreeId(publicId) - if err == nil { - message := "Requested id is accepted" - if publicId != newPublicId { - message = "The server allocated a new id." - } - publicId = newPublicId - comms.SendRegistrationMessage(conn, comms.AgentRegistration{ - Ok: true, - Message: message, - Id: publicId, - }) - } else { - comms.SendRegistrationMessage(conn, comms.AgentRegistration{ - Ok: false, - Message: err.Error(), - }) - } - agent := admin.agents[publicId] - if agent != nil { - return nil, fmt.Errorf("SHOULD NEVER GET HERE!!!, A different agent with same PublicId '%s' already registered", publicId) - } - - commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn) - - if err != nil { - return nil, err - } - agent = NewAgent(commChannel, publicId, agentInfo) - admin.agents[publicId] = agent - admin.logStatus() - return agent, nil -} - -func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser) (*ClientConnection, error) { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - agent := admin.agents[publicId] - if agent == nil { - // we should setup on-demend connections ot agents later. - return nil, fmt.Errorf("No agent found for rendez-vous id '%s'", publicId) - } - - agentConn, err := admin.getAgentConnection(agent) - if err != nil { - return nil, err - } - log.Println("Successful websocket connection to agent") - - log.Println("Sending connection information to agent") - - client := NewClient(publicId, clientConn, agentConn, agent.Guid) - - // Before using this connection for SSH we use it to send client metadata to the - // agent - err = comms.SendClientInfo(agentConn, comms.ClientInfo{ - ClientId: client.ClientId, - }) - if err != nil { - return nil, err - } - - admin.clients = append(admin.clients, client) - admin.logStatus() - return client, nil -} - -func (admin *Admin) getAgentConnection(agent *AgentConnection) (net.Conn, error) { - agentConn, err := agent.commChannel.Session.Open() - count := 0 - for err != nil && count < 10 { - log.Printf("Retrying connection to agent: %v", err) - time.Sleep(250 * time.Millisecond) - count++ - agentConn, err = agent.commChannel.Session.Open() - } - return agentConn, err -} - -func (admin *Admin) RemoveAgent(publicId string) error { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - agent := admin.agents[publicId] - if agent == nil { - return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) - } - log.Printf("Removing agent: '%s'", publicId) - err := agent.commChannel.Session.Close() - if err != nil { - log.Printf("Could not close yamux client session for '%s'\n", publicId) - } - delete(admin.agents, publicId) - admin.logStatus() - return nil -} - -func (admin *Admin) RemoveClient(client *ClientConnection) error { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - log.Printf("Removing client: '%s' created at %s\n", client.ClientId, - client.StartTime.Format(time.DateTime)) - // try to explicitly close connection to the agent. - _ = client.agent.Close() - _ = client.client.Close() - - for i, _client := range admin.clients { - if _client.ClientId == client.ClientId { - admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) - break - } - } - admin.logStatus() - return nil -} - -func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error { - - serverInfo := comms.ServerInfo{} - - agentInfo, err := comms.ServerInitialization(conn, serverInfo) - if err != nil { - return err - } - - agent, err := admin.addAgent(publicId, agentInfo, conn) - if err != nil { - return err - } - publicId = agent.PublicId - defer func() { - admin.RemoveAgent(publicId) - }() - - go func() { - comms.ListenForAgentEvents(agent.commChannel.SideChannel, - func(info comms.EnvironmentInfo) { - agent.EnvironmentInfo = info - admin.logStatus() - }, - func(session comms.SessionInfo) { - log.Println("Recceived sessioninfo ", session) - for _, client := range admin.clients { - // a bit hacky. There should be at most one client that has an unset session - // Very unlikely for multiple sessions to start at the same point in time. - if client.ClientId == session.ClientId { - client.SessionType = session.SessionType - break - } - } - }, - func(expiry comms.ExpiryTimeUpdate) { - agent.ExpiryTime = expiry.ExpiryTime - admin.logStatus() - }) - }() - - go log.Printf("AgentConnection registered: '%s'\n", publicId) - for !agent.commChannel.Session.IsClosed() { - time.Sleep(250 * time.Millisecond) - } - return nil -} - -func (admin *Admin) Connect(wsProxyMode bool, publicId string, conn iowrappers2.ReadWriteAddrCloser) error { - defer conn.Close() - - log.Printf("Using wsproxy protocol %v", wsProxyMode) - channel := comms.NewGOBChannel(conn) - if wsProxyMode { - err := comms.SendWithTimeout( - channel, - comms.ProtocolVersion{ - Version: comms.PROTOCOL_VERSION, - }) - if err != nil { - log.Printf("Error sending protocol version to client %v", err) - return err - } - } - - client, err := admin.addClient(publicId, conn) - if err != nil { - if wsProxyMode { - _ = comms.SendWithTimeout(channel, - comms.ClientConnectionInfo{ - Ok: false, - Message: err.Error(), - }) - } - return err - } - defer func() { - admin.RemoveClient(client) - }() - log.Printf("Connecting client and agent: '%s'\n", publicId) - if wsProxyMode { - err = comms.SendWithTimeout(channel, - comms.ClientConnectionInfo{ - Ok: true, - Message: "Connecting to agent", - }) - if err != nil { - return fmt.Errorf("Error sending connection info to client: %v", err) - } - clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel) - if err != nil { - return fmt.Errorf("Error receiving environment info from client: %v", err) - } - client.EnvironmentInfo = clientEnvironment - admin.logStatus() - } - - iowrappers2.SynchronizeStreams("client -- agent", client.client, client.agent) - return nil -} diff --git a/pkg/server/converge/matchmaker.go b/pkg/server/converge/matchmaker.go new file mode 100644 index 0000000..fcde691 --- /dev/null +++ b/pkg/server/converge/matchmaker.go @@ -0,0 +1,164 @@ +package converge + +import ( + "converge/pkg/comms" + "converge/pkg/models" + "converge/pkg/server/admin" + iowrappers2 "converge/pkg/support/iowrappers" + "fmt" + "io" + "log" + "time" +) + +type MatchMaker struct { + admin admin.Admin + notifier Notifier +} + +func NewMatchMaker(notifier Notifier) *MatchMaker { + converge := MatchMaker{ + admin: *admin.NewAdmin(), + notifier: notifier, + } + converge.logStatus() + return &converge +} + +func (converge *MatchMaker) Register(publicId string, conn io.ReadWriteCloser) error { + + serverInfo := comms.ServerInfo{} + + agentInfo, err := comms.ServerInitialization(conn, serverInfo) + if err != nil { + return err + } + + agent, err := converge.admin.AddAgent(publicId, agentInfo, conn) + converge.logStatus() + if err != nil { + return err + } + publicId = agent.PublicId + defer func() { + converge.admin.RemoveAgent(publicId) + converge.logStatus() + }() + + go func() { + comms.ListenForAgentEvents(agent.CommChannel.SideChannel, + func(info comms.EnvironmentInfo) { + agent.EnvironmentInfo = info + converge.logStatus() + }, + func(session comms.SessionInfo) { + log.Println("Recceived sessioninfo ", session) + converge.admin.SetSessionType(session.ClientId, session.SessionType) + }, + func(expiry comms.ExpiryTimeUpdate) { + agent.ExpiryTime = expiry.ExpiryTime + converge.logStatus() + }) + }() + + go log.Printf("agentConnection registered: '%s'\n", publicId) + for !agent.CommChannel.Session.IsClosed() { + time.Sleep(250 * time.Millisecond) + } + return nil +} + +func (converge *MatchMaker) Connect(wsProxyMode bool, publicId string, conn iowrappers2.ReadWriteAddrCloser) error { + defer conn.Close() + + log.Printf("Using wsproxy protocol %v", wsProxyMode) + channel := comms.NewGOBChannel(conn) + if wsProxyMode { + err := comms.SendWithTimeout( + channel, + comms.ProtocolVersion{ + Version: comms.PROTOCOL_VERSION, + }) + if err != nil { + log.Printf("Error sending protocol version to client %v", err) + return err + } + } + + client, err := converge.admin.AddClient(publicId, conn) + converge.logStatus() + if err != nil { + if wsProxyMode { + _ = comms.SendWithTimeout(channel, + comms.ClientConnectionInfo{ + Ok: false, + Message: err.Error(), + }) + } + return err + } + defer func() { + converge.admin.RemoveClient(client) + converge.logStatus() + }() + log.Printf("Connecting client and agent: '%s'\n", publicId) + if wsProxyMode { + err = comms.SendWithTimeout(channel, + comms.ClientConnectionInfo{ + Ok: true, + Message: "Connecting to agent", + }) + if err != nil { + return fmt.Errorf("Error sending connection info to client: %v", err) + } + clientEnvironment, err := comms.ReceiveWithTimeout[comms.EnvironmentInfo](channel) + if err != nil { + return fmt.Errorf("Error receiving environment info from client: %v", err) + } + client.EnvironmentInfo = clientEnvironment + converge.logStatus() + } + + iowrappers2.SynchronizeStreams("client -- agent", client.ClientConnection, client.AgentConnection) + return nil +} + +func (converge *MatchMaker) logStatus() { + notification := converge.admin.CreateNotifification() + logStatusImpl(notification, converge.notifier) +} + +// logging without having the lock on the administration. +func logStatusImpl(admin *models.State, notifier Notifier) { + format := "%-20s %-20s %-20s %-10s %-15s %-20s" + + lines := make([]string, 0, 100) + + lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", + "USER", "HOST", "OS")) + for _, agent := range admin.Agents { + lines = append(lines, fmt.Sprintf(format, agent.PublicId, + agent.StartTime.Format(time.DateTime), + agent.ExpiryTime.Format(time.DateTime), + agent.EnvironmentInfo.Username, + agent.EnvironmentInfo.Hostname, + agent.EnvironmentInfo.OS)) + } + lines = append(lines, "") + format = "%-10s %-20s %-20s %-20s %-20s" + lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")) + for _, client := range admin.Clients { + lines = append(lines, fmt.Sprintf(format, + client.ClientId, + client.PublicId, + client.StartTime.Format(time.DateTime), + client.RemoteAddr, + client.SessionType)) + } + lines = append(lines, "") + for _, line := range lines { + log.Println(line) + } + + notifier.Publish(admin) +} diff --git a/pkg/server/converge/websessions.go b/pkg/server/converge/websessions.go index 504bafb..27dfdd3 100644 --- a/pkg/server/converge/websessions.go +++ b/pkg/server/converge/websessions.go @@ -87,6 +87,7 @@ func (session *WebSession) WriteNotifications(location *time.Location, cancel co case notification, ok := <-session.notifications: if !ok { log.Println("channel closed") + return } //log.Println("Got notification: ", notification) err := templates.State(notification, location).Render(context.Background(), session.conn)