From d6d2d5648c2d4ec6987f4eabad8d9307c58da6b4 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Tue, 30 Jul 2024 21:51:30 +0200 Subject: [PATCH] Live updates of the sessions. V1 in ascii-art. To be improved. --- cmd/agent/agent.go | 2 +- cmd/converge/converge.go | 13 +++- cmd/converge/sessionhandler.go | 35 ++++------ pkg/models/state.go | 7 ++ pkg/server/converge/admin.go | 63 ++++++++++++------ pkg/server/converge/websessions.go | 96 ++++++++++++++++++++++++++++ pkg/server/templates/about.templ | 2 +- pkg/server/templates/downloads.templ | 12 ++-- pkg/server/templates/sessions.templ | 2 + pkg/server/templates/usage.templ | 6 +- 10 files changed, 184 insertions(+), 54 deletions(-) create mode 100644 pkg/models/state.go create mode 100644 pkg/server/converge/websessions.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index e78b3a9..0401383 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -199,7 +199,7 @@ func main() { pprofPort, ok := os.LookupEnv("PPROF_PORT") if ok { - log.Printf("Enablilng pprof on localhost:%s", pprofPort) + log.Printf("Enabllng pprof on localhost:%s", pprofPort) go func() { log.Println(http.ListenAndServe("localhost:"+pprofPort, nil)) }() diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go index 4cc0592..35ebfde 100644 --- a/cmd/converge/converge.go +++ b/cmd/converge/converge.go @@ -2,6 +2,7 @@ package main import ( "converge/pkg/comms" + "converge/pkg/models" "converge/pkg/server/converge" "converge/pkg/support/websocketutil" "fmt" @@ -95,7 +96,9 @@ func main() { log.Printf("Using username '%s' and password '%s'", userPassword.Username, userPassword.Password) - admin := converge.NewAdmin() + notifications := make(chan *models.State, 10) + admin := converge.NewAdmin(notifications) + websessions := converge.NewWebSessions(notifications) // For agents connecting registrationService := websocketutil.WebSocketService{ @@ -132,8 +135,12 @@ func main() { // for the web browser getting live status updates. sessionService := websocketutil.WebSocketService{ - Handler: sessionHandler, - Text: true, + Handler: func(w http.ResponseWriter, r *http.Request, conn net.Conn) { + websession := websessions.NewSession(conn) + defer websessions.SessionClosed(websession) + websession.WriteNotifications() + }, + Text: true, } // websocket endpoints diff --git a/cmd/converge/sessionhandler.go b/cmd/converge/sessionhandler.go index dc90982..66a4efd 100644 --- a/cmd/converge/sessionhandler.go +++ b/cmd/converge/sessionhandler.go @@ -1,7 +1,6 @@ package main import ( - "encoding/json" "log" "net" "net/http" @@ -16,33 +15,25 @@ type Message struct { func sessionHandler(w http.ResponseWriter, r *http.Request, conn net.Conn) { log.Println("Got sessions websocket connection") + go func() { + for { + b := make([]byte, 1024) + log.Printf("Reading from %v", conn) + _, err := conn.Read(b) + if err != nil { + return + } + } + }() i := 0 for { time.Sleep(1 * time.Second) - message := Message{ - Type: "update", - Content: ` + message := `
New data: ` + strconv.Itoa(i) + `
-`, - } - _, err := json.Marshal(message) - if err != nil { - log.Printf("ERROR marshalling json: %v", err) - return - } - //conn.Write(data) - go func() { - for { - b := make([]byte, 1024) - _, err := conn.Read(b) - if err != nil { - return - } - } - }() - _, err = conn.Write([]byte(message.Content)) +` + _, err := conn.Write([]byte(message)) if err == nil { _, err = conn.Write([]byte("\n")) } diff --git a/pkg/models/state.go b/pkg/models/state.go new file mode 100644 index 0000000..70d1cfd --- /dev/null +++ b/pkg/models/state.go @@ -0,0 +1,7 @@ +package models + +type State struct { + Agents []Agent + Clients []Client + Ascii string +} diff --git a/pkg/server/converge/admin.go b/pkg/server/converge/admin.go index df59903..0dbf315 100644 --- a/pkg/server/converge/admin.go +++ b/pkg/server/converge/admin.go @@ -10,6 +10,7 @@ import ( "log" "net" "strconv" + "strings" "sync" "time" ) @@ -54,45 +55,71 @@ func NewClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser, type Admin struct { // map of public id to agent - mutex sync.Mutex - agents map[string]*AgentConnection - clients []*ClientConnection + mutex sync.Mutex + agents map[string]*AgentConnection + clients []*ClientConnection + notifications chan *models.State } -func NewAdmin() *Admin { +func NewAdmin(notifications chan *models.State) *Admin { admin := Admin{ - mutex: sync.Mutex{}, - agents: make(map[string]*AgentConnection), - clients: make([]*ClientConnection, 0), // not strictly needed + mutex: sync.Mutex{}, + agents: make(map[string]*AgentConnection), + clients: make([]*ClientConnection, 0), // not strictly needed + notifications: notifications, } + admin.logStatus() return &admin } +func (admin *Admin) createNotification() *models.State { + state := models.State{} + state.Agents = make([]models.Agent, 0, len(admin.agents)) + state.Clients = make([]models.Client, 0, len(admin.clients)) + for _, agent := range admin.agents { + state.Agents = append(state.Agents, agent.Agent) + } + for _, client := range admin.clients { + state.Clients = append(state.Clients, client.Client) + } + return &state +} + func (admin *Admin) logStatus() { - fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n" - log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", - "USER", "HOST", "OS") + format := "%-20s %-20s %-20s %-10s %-15s %-20s" + + lines := make([]string, 0, 100) + + lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", + "USER", "HOST", "OS")) for _, agent := range admin.agents { agent.commChannel.Session.RemoteAddr() - log.Printf(fmt, agent.PublicId, + lines = append(lines, fmt.Sprintf(format, agent.PublicId, agent.StartTime.Format(time.DateTime), agent.ExpiryTime.Format(time.DateTime), agent.AgentInfo.Username, agent.AgentInfo.Hostname, - agent.AgentInfo.OS) + agent.AgentInfo.OS)) } - log.Println("") - fmt = "%-10s %-20s %-20s %-20s %-20s\n" - log.Printf(fmt, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE") + lines = append(lines, "") + format = "%-10s %-20s %-20s %-20s %-20s" + lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")) for _, client := range admin.clients { - log.Printf(fmt, + lines = append(lines, fmt.Sprintf(format, strconv.Itoa(client.ClientId), client.PublicId, client.StartTime.Format(time.DateTime), client.client.RemoteAddr(), - client.SessionType) + client.SessionType)) } - log.Printf("\n") + lines = append(lines, "") + for _, line := range lines { + log.Println(line) + } + + notification := admin.createNotification() + notification.Ascii = strings.Join(lines, "\n") + admin.notifications <- notification } func (admin *Admin) getFreeId(publicId string) (string, error) { diff --git a/pkg/server/converge/websessions.go b/pkg/server/converge/websessions.go new file mode 100644 index 0000000..203819e --- /dev/null +++ b/pkg/server/converge/websessions.go @@ -0,0 +1,96 @@ +package converge + +import ( + "converge/pkg/models" + "log" + "net" + "sync" + "time" +) + +type WebSessions struct { + mutex sync.Mutex + sessions map[*WebSession]bool + lastNotification *models.State +} + +type WebSession struct { + notifications chan *models.State + conn net.Conn +} + +func NewWebSessions(notifications chan *models.State) *WebSessions { + websessions := &WebSessions{ + sessions: make(map[*WebSession]bool), + } + + go func() { + for { + notification := <-notifications + websessions.notifyWebSessions(notification) + } + }() + return websessions +} + +func (sessions *WebSessions) notifyWebSessions(notification *models.State) { + sessions.mutex.Lock() + defer sessions.mutex.Unlock() + sessions.lastNotification = notification + for session, _ := range sessions.sessions { + session.notifications <- notification + } +} + +func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession { + sessions.mutex.Lock() + defer sessions.mutex.Unlock() + session := &WebSession{ + notifications: make(chan *models.State, 10), + conn: wsConnection, + } + sessions.sessions[session] = true + sessions.logSessions() + session.notifications <- sessions.lastNotification + return session +} + +func (session *WebSession) WriteNotifications() { + for { + select { + case notification, ok := <-session.notifications: + if !ok { + log.Println("channel closed") + } + log.Println("Got notification: ", notification.Ascii) + msg := ` +
+

V1 ascii-art to be improved

+
` + notification.Ascii + `
+
` + _, err := session.conn.Write([]byte(msg)) + if err != nil { + log.Printf("WS connection closed: %v", err) + return + } + case <-time.After(10 * time.Second): + _, err := session.conn.Write([]byte("
ping
")) + if err != nil { + log.Printf("WS connection closed: %v", err) + return + } + } + } +} + +func (sessions *WebSessions) SessionClosed(session *WebSession) { + sessions.mutex.Lock() + defer sessions.mutex.Unlock() + + delete(sessions.sessions, session) + sessions.logSessions() +} + +func (sessions *WebSessions) logSessions() { + log.Printf("New web session, nsessions %d", len(sessions.sessions)) +} diff --git a/pkg/server/templates/about.templ b/pkg/server/templates/about.templ index 003875e..dddd15b 100644 --- a/pkg/server/templates/about.templ +++ b/pkg/server/templates/about.templ @@ -2,7 +2,7 @@ package templates templ About() {
-

About

+

about

Converge is a utility for troubleshooting builds on continuous integration servers. It solves a common problem where the cause of job failure is difficult to determine. diff --git a/pkg/server/templates/downloads.templ b/pkg/server/templates/downloads.templ index b024072..e06ba55 100644 --- a/pkg/server/templates/downloads.templ +++ b/pkg/server/templates/downloads.templ @@ -4,35 +4,35 @@ package templates templ Downloads() {

-

Downloads

+

downloads

- + - + - + - +
ComponentPurpose Linux WindowsPurpose
agentThe agent to run inside aa CI job agent agent.exeThe agent to run inside aa CI job
wsproxySSH proxy command that can be directly used by ssh wsproxy wsproxy.exeSSH proxy command that can be directly used by ssh
tcptowsTCP to WS tunnel for allowing regular - SSH and SFTP clients to connect to converge tcptows tcptows.exeTCP to WS tunnel for allowing regular + SSH and SFTP clients to connect to converge
diff --git a/pkg/server/templates/sessions.templ b/pkg/server/templates/sessions.templ index 3bed403..ceac7cd 100644 --- a/pkg/server/templates/sessions.templ +++ b/pkg/server/templates/sessions.templ @@ -3,6 +3,8 @@ package templates templ Sessions() {
+

sessions

+
Initial content
diff --git a/pkg/server/templates/usage.templ b/pkg/server/templates/usage.templ index e9e5abd..8154bc8 100644 --- a/pkg/server/templates/usage.templ +++ b/pkg/server/templates/usage.templ @@ -2,7 +2,7 @@ package templates templ Usage(secure string, host string, username string) {
-

Usage

+

usage

Continuous integration jobs

@@ -56,7 +56,7 @@ templ Usage(secure string, host string, username string) {

Local clients: using ssh with a proxy command

wsproxy is a command that can be used as a proxy command for SSH which performs the connection to the - remote server. This command needs to be downloaded only once (see downloads below). It does not depend on + remote server. This command needs to be downloaded only once (see downloads below). It does not depend on the converge implementation but only on the websocket standards. Other tools that provide a mapping of stdio to a websocket can also be used instead of wsproxy.

@@ -80,7 +80,7 @@ templ Usage(secure string, host string, username string) {

Local clients can connect using regular ssh and sftp commands through a tunnel that translates a local TCP port to a websocket connection in converge. See - the downloads section. + the downloads section. This runs a local client that allows SSH to port 10000 and connects to converge using a websocket connection.