diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go
index e78b3a9..0401383 100755
--- a/cmd/agent/agent.go
+++ b/cmd/agent/agent.go
@@ -199,7 +199,7 @@ func main() {
pprofPort, ok := os.LookupEnv("PPROF_PORT")
if ok {
- log.Printf("Enablilng pprof on localhost:%s", pprofPort)
+ log.Printf("Enabllng pprof on localhost:%s", pprofPort)
go func() {
log.Println(http.ListenAndServe("localhost:"+pprofPort, nil))
}()
diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go
index 4cc0592..35ebfde 100644
--- a/cmd/converge/converge.go
+++ b/cmd/converge/converge.go
@@ -2,6 +2,7 @@ package main
import (
"converge/pkg/comms"
+ "converge/pkg/models"
"converge/pkg/server/converge"
"converge/pkg/support/websocketutil"
"fmt"
@@ -95,7 +96,9 @@ func main() {
log.Printf("Using username '%s' and password '%s'", userPassword.Username, userPassword.Password)
- admin := converge.NewAdmin()
+ notifications := make(chan *models.State, 10)
+ admin := converge.NewAdmin(notifications)
+ websessions := converge.NewWebSessions(notifications)
// For agents connecting
registrationService := websocketutil.WebSocketService{
@@ -132,8 +135,12 @@ func main() {
// for the web browser getting live status updates.
sessionService := websocketutil.WebSocketService{
- Handler: sessionHandler,
- Text: true,
+ Handler: func(w http.ResponseWriter, r *http.Request, conn net.Conn) {
+ websession := websessions.NewSession(conn)
+ defer websessions.SessionClosed(websession)
+ websession.WriteNotifications()
+ },
+ Text: true,
}
// websocket endpoints
diff --git a/cmd/converge/sessionhandler.go b/cmd/converge/sessionhandler.go
index dc90982..66a4efd 100644
--- a/cmd/converge/sessionhandler.go
+++ b/cmd/converge/sessionhandler.go
@@ -1,7 +1,6 @@
package main
import (
- "encoding/json"
"log"
"net"
"net/http"
@@ -16,33 +15,25 @@ type Message struct {
func sessionHandler(w http.ResponseWriter, r *http.Request, conn net.Conn) {
log.Println("Got sessions websocket connection")
+ go func() {
+ for {
+ b := make([]byte, 1024)
+ log.Printf("Reading from %v", conn)
+ _, err := conn.Read(b)
+ if err != nil {
+ return
+ }
+ }
+ }()
i := 0
for {
time.Sleep(1 * time.Second)
- message := Message{
- Type: "update",
- Content: `
+ message := `
New data: ` + strconv.Itoa(i) + `
-`,
- }
- _, err := json.Marshal(message)
- if err != nil {
- log.Printf("ERROR marshalling json: %v", err)
- return
- }
- //conn.Write(data)
- go func() {
- for {
- b := make([]byte, 1024)
- _, err := conn.Read(b)
- if err != nil {
- return
- }
- }
- }()
- _, err = conn.Write([]byte(message.Content))
+`
+ _, err := conn.Write([]byte(message))
if err == nil {
_, err = conn.Write([]byte("\n"))
}
diff --git a/pkg/models/state.go b/pkg/models/state.go
new file mode 100644
index 0000000..70d1cfd
--- /dev/null
+++ b/pkg/models/state.go
@@ -0,0 +1,7 @@
+package models
+
+type State struct {
+ Agents []Agent
+ Clients []Client
+ Ascii string
+}
diff --git a/pkg/server/converge/admin.go b/pkg/server/converge/admin.go
index df59903..0dbf315 100644
--- a/pkg/server/converge/admin.go
+++ b/pkg/server/converge/admin.go
@@ -10,6 +10,7 @@ import (
"log"
"net"
"strconv"
+ "strings"
"sync"
"time"
)
@@ -54,45 +55,71 @@ func NewClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser,
type Admin struct {
// map of public id to agent
- mutex sync.Mutex
- agents map[string]*AgentConnection
- clients []*ClientConnection
+ mutex sync.Mutex
+ agents map[string]*AgentConnection
+ clients []*ClientConnection
+ notifications chan *models.State
}
-func NewAdmin() *Admin {
+func NewAdmin(notifications chan *models.State) *Admin {
admin := Admin{
- mutex: sync.Mutex{},
- agents: make(map[string]*AgentConnection),
- clients: make([]*ClientConnection, 0), // not strictly needed
+ mutex: sync.Mutex{},
+ agents: make(map[string]*AgentConnection),
+ clients: make([]*ClientConnection, 0), // not strictly needed
+ notifications: notifications,
}
+ admin.logStatus()
return &admin
}
+func (admin *Admin) createNotification() *models.State {
+ state := models.State{}
+ state.Agents = make([]models.Agent, 0, len(admin.agents))
+ state.Clients = make([]models.Client, 0, len(admin.clients))
+ for _, agent := range admin.agents {
+ state.Agents = append(state.Agents, agent.Agent)
+ }
+ for _, client := range admin.clients {
+ state.Clients = append(state.Clients, client.Client)
+ }
+ return &state
+}
+
func (admin *Admin) logStatus() {
- fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n"
- log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
- "USER", "HOST", "OS")
+ format := "%-20s %-20s %-20s %-10s %-15s %-20s"
+
+ lines := make([]string, 0, 100)
+
+ lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
+ "USER", "HOST", "OS"))
for _, agent := range admin.agents {
agent.commChannel.Session.RemoteAddr()
- log.Printf(fmt, agent.PublicId,
+ lines = append(lines, fmt.Sprintf(format, agent.PublicId,
agent.StartTime.Format(time.DateTime),
agent.ExpiryTime.Format(time.DateTime),
agent.AgentInfo.Username,
agent.AgentInfo.Hostname,
- agent.AgentInfo.OS)
+ agent.AgentInfo.OS))
}
- log.Println("")
- fmt = "%-10s %-20s %-20s %-20s %-20s\n"
- log.Printf(fmt, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")
+ lines = append(lines, "")
+ format = "%-10s %-20s %-20s %-20s %-20s"
+ lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE"))
for _, client := range admin.clients {
- log.Printf(fmt,
+ lines = append(lines, fmt.Sprintf(format,
strconv.Itoa(client.ClientId),
client.PublicId,
client.StartTime.Format(time.DateTime),
client.client.RemoteAddr(),
- client.SessionType)
+ client.SessionType))
}
- log.Printf("\n")
+ lines = append(lines, "")
+ for _, line := range lines {
+ log.Println(line)
+ }
+
+ notification := admin.createNotification()
+ notification.Ascii = strings.Join(lines, "\n")
+ admin.notifications <- notification
}
func (admin *Admin) getFreeId(publicId string) (string, error) {
diff --git a/pkg/server/converge/websessions.go b/pkg/server/converge/websessions.go
new file mode 100644
index 0000000..203819e
--- /dev/null
+++ b/pkg/server/converge/websessions.go
@@ -0,0 +1,96 @@
+package converge
+
+import (
+ "converge/pkg/models"
+ "log"
+ "net"
+ "sync"
+ "time"
+)
+
+type WebSessions struct {
+ mutex sync.Mutex
+ sessions map[*WebSession]bool
+ lastNotification *models.State
+}
+
+type WebSession struct {
+ notifications chan *models.State
+ conn net.Conn
+}
+
+func NewWebSessions(notifications chan *models.State) *WebSessions {
+ websessions := &WebSessions{
+ sessions: make(map[*WebSession]bool),
+ }
+
+ go func() {
+ for {
+ notification := <-notifications
+ websessions.notifyWebSessions(notification)
+ }
+ }()
+ return websessions
+}
+
+func (sessions *WebSessions) notifyWebSessions(notification *models.State) {
+ sessions.mutex.Lock()
+ defer sessions.mutex.Unlock()
+ sessions.lastNotification = notification
+ for session, _ := range sessions.sessions {
+ session.notifications <- notification
+ }
+}
+
+func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession {
+ sessions.mutex.Lock()
+ defer sessions.mutex.Unlock()
+ session := &WebSession{
+ notifications: make(chan *models.State, 10),
+ conn: wsConnection,
+ }
+ sessions.sessions[session] = true
+ sessions.logSessions()
+ session.notifications <- sessions.lastNotification
+ return session
+}
+
+func (session *WebSession) WriteNotifications() {
+ for {
+ select {
+ case notification, ok := <-session.notifications:
+ if !ok {
+ log.Println("channel closed")
+ }
+ log.Println("Got notification: ", notification.Ascii)
+ msg := `
+
+
V1 ascii-art to be improved
+
` + notification.Ascii + `
+
`
+ _, err := session.conn.Write([]byte(msg))
+ if err != nil {
+ log.Printf("WS connection closed: %v", err)
+ return
+ }
+ case <-time.After(10 * time.Second):
+ _, err := session.conn.Write([]byte("ping
"))
+ if err != nil {
+ log.Printf("WS connection closed: %v", err)
+ return
+ }
+ }
+ }
+}
+
+func (sessions *WebSessions) SessionClosed(session *WebSession) {
+ sessions.mutex.Lock()
+ defer sessions.mutex.Unlock()
+
+ delete(sessions.sessions, session)
+ sessions.logSessions()
+}
+
+func (sessions *WebSessions) logSessions() {
+ log.Printf("New web session, nsessions %d", len(sessions.sessions))
+}
diff --git a/pkg/server/templates/about.templ b/pkg/server/templates/about.templ
index 003875e..dddd15b 100644
--- a/pkg/server/templates/about.templ
+++ b/pkg/server/templates/about.templ
@@ -2,7 +2,7 @@ package templates
templ About() {
-
About
+
about
Converge is a utility for troubleshooting builds on continuous integration servers.
It solves a common problem where the cause of job failure is difficult to determine.
diff --git a/pkg/server/templates/downloads.templ b/pkg/server/templates/downloads.templ
index b024072..e06ba55 100644
--- a/pkg/server/templates/downloads.templ
+++ b/pkg/server/templates/downloads.templ
@@ -4,35 +4,35 @@ package templates
templ Downloads() {
-
Downloads
+
downloads
Component |
- Purpose |
Linux |
Windows |
+ Purpose |
agent |
- The agent to run inside aa CI job |
agent |
agent.exe |
+ The agent to run inside aa CI job |
wsproxy |
- SSH proxy command that can be directly used by ssh |
wsproxy |
wsproxy.exe |
+ SSH proxy command that can be directly used by ssh |
tcptows |
- TCP to WS tunnel for allowing regular
- SSH and SFTP clients to connect to converge |
tcptows |
tcptows.exe |
+ TCP to WS tunnel for allowing regular
+ SSH and SFTP clients to connect to converge |
diff --git a/pkg/server/templates/sessions.templ b/pkg/server/templates/sessions.templ
index 3bed403..ceac7cd 100644
--- a/pkg/server/templates/sessions.templ
+++ b/pkg/server/templates/sessions.templ
@@ -3,6 +3,8 @@ package templates
templ Sessions() {
+
sessions
+
Initial content
diff --git a/pkg/server/templates/usage.templ b/pkg/server/templates/usage.templ
index e9e5abd..8154bc8 100644
--- a/pkg/server/templates/usage.templ
+++ b/pkg/server/templates/usage.templ
@@ -2,7 +2,7 @@ package templates
templ Usage(secure string, host string, username string) {
-
Usage
+
usage
Continuous integration jobs
@@ -56,7 +56,7 @@ templ Usage(secure string, host string, username string) {
Local clients: using ssh with a proxy command
wsproxy
is a command that can be used as a proxy command for SSH which performs the connection to the
- remote server. This command needs to be downloaded only once (see downloads below). It does not depend on
+ remote server. This command needs to be downloaded only once (see downloads below). It does not depend on
the converge implementation but only on the websocket standards. Other tools that
provide a mapping of stdio to a websocket can also be used instead of wsproxy.
@@ -80,7 +80,7 @@ templ Usage(secure string, host string, username string) {
Local clients can connect using regular ssh and sftp commands through a tunnel that
translates a local TCP port to a websocket connection in converge. See
- the downloads section.
+ the downloads section.
This runs a local client that allows SSH to port 10000 and connects to converge using
a websocket connection.