Live updates of the sessions.

V1 in ascii-art. To be improved.
This commit is contained in:
Erik Brakkee 2024-07-30 21:51:30 +02:00
parent 100771a7ba
commit d6d2d5648c
10 changed files with 184 additions and 54 deletions

View File

@ -199,7 +199,7 @@ func main() {
pprofPort, ok := os.LookupEnv("PPROF_PORT")
if ok {
log.Printf("Enablilng pprof on localhost:%s", pprofPort)
log.Printf("Enabllng pprof on localhost:%s", pprofPort)
go func() {
log.Println(http.ListenAndServe("localhost:"+pprofPort, nil))
}()

View File

@ -2,6 +2,7 @@ package main
import (
"converge/pkg/comms"
"converge/pkg/models"
"converge/pkg/server/converge"
"converge/pkg/support/websocketutil"
"fmt"
@ -95,7 +96,9 @@ func main() {
log.Printf("Using username '%s' and password '%s'", userPassword.Username, userPassword.Password)
admin := converge.NewAdmin()
notifications := make(chan *models.State, 10)
admin := converge.NewAdmin(notifications)
websessions := converge.NewWebSessions(notifications)
// For agents connecting
registrationService := websocketutil.WebSocketService{
@ -132,7 +135,11 @@ func main() {
// for the web browser getting live status updates.
sessionService := websocketutil.WebSocketService{
Handler: sessionHandler,
Handler: func(w http.ResponseWriter, r *http.Request, conn net.Conn) {
websession := websessions.NewSession(conn)
defer websessions.SessionClosed(websession)
websession.WriteNotifications()
},
Text: true,
}

View File

@ -1,7 +1,6 @@
package main
import (
"encoding/json"
"log"
"net"
"net/http"
@ -16,33 +15,25 @@ type Message struct {
func sessionHandler(w http.ResponseWriter, r *http.Request, conn net.Conn) {
log.Println("Got sessions websocket connection")
i := 0
for {
time.Sleep(1 * time.Second)
message := Message{
Type: "update",
Content: `
<div id="mycontent">
New data: ` + strconv.Itoa(i) + `
</div>
`,
}
_, err := json.Marshal(message)
if err != nil {
log.Printf("ERROR marshalling json: %v", err)
return
}
//conn.Write(data)
go func() {
for {
b := make([]byte, 1024)
log.Printf("Reading from %v", conn)
_, err := conn.Read(b)
if err != nil {
return
}
}
}()
_, err = conn.Write([]byte(message.Content))
i := 0
for {
time.Sleep(1 * time.Second)
message := `
<div id="mycontent">
New data: ` + strconv.Itoa(i) + `
</div>
`
_, err := conn.Write([]byte(message))
if err == nil {
_, err = conn.Write([]byte("\n"))
}

7
pkg/models/state.go Normal file
View File

@ -0,0 +1,7 @@
package models
type State struct {
Agents []Agent
Clients []Client
Ascii string
}

View File

@ -10,6 +10,7 @@ import (
"log"
"net"
"strconv"
"strings"
"sync"
"time"
)
@ -57,42 +58,68 @@ type Admin struct {
mutex sync.Mutex
agents map[string]*AgentConnection
clients []*ClientConnection
notifications chan *models.State
}
func NewAdmin() *Admin {
func NewAdmin(notifications chan *models.State) *Admin {
admin := Admin{
mutex: sync.Mutex{},
agents: make(map[string]*AgentConnection),
clients: make([]*ClientConnection, 0), // not strictly needed
notifications: notifications,
}
admin.logStatus()
return &admin
}
func (admin *Admin) createNotification() *models.State {
state := models.State{}
state.Agents = make([]models.Agent, 0, len(admin.agents))
state.Clients = make([]models.Client, 0, len(admin.clients))
for _, agent := range admin.agents {
state.Agents = append(state.Agents, agent.Agent)
}
for _, client := range admin.clients {
state.Clients = append(state.Clients, client.Client)
}
return &state
}
func (admin *Admin) logStatus() {
fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n"
log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS")
format := "%-20s %-20s %-20s %-10s %-15s %-20s"
lines := make([]string, 0, 100)
lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS"))
for _, agent := range admin.agents {
agent.commChannel.Session.RemoteAddr()
log.Printf(fmt, agent.PublicId,
lines = append(lines, fmt.Sprintf(format, agent.PublicId,
agent.StartTime.Format(time.DateTime),
agent.ExpiryTime.Format(time.DateTime),
agent.AgentInfo.Username,
agent.AgentInfo.Hostname,
agent.AgentInfo.OS)
agent.AgentInfo.OS))
}
log.Println("")
fmt = "%-10s %-20s %-20s %-20s %-20s\n"
log.Printf(fmt, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")
lines = append(lines, "")
format = "%-10s %-20s %-20s %-20s %-20s"
lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE"))
for _, client := range admin.clients {
log.Printf(fmt,
lines = append(lines, fmt.Sprintf(format,
strconv.Itoa(client.ClientId),
client.PublicId,
client.StartTime.Format(time.DateTime),
client.client.RemoteAddr(),
client.SessionType)
client.SessionType))
}
log.Printf("\n")
lines = append(lines, "")
for _, line := range lines {
log.Println(line)
}
notification := admin.createNotification()
notification.Ascii = strings.Join(lines, "\n")
admin.notifications <- notification
}
func (admin *Admin) getFreeId(publicId string) (string, error) {

View File

@ -0,0 +1,96 @@
package converge
import (
"converge/pkg/models"
"log"
"net"
"sync"
"time"
)
type WebSessions struct {
mutex sync.Mutex
sessions map[*WebSession]bool
lastNotification *models.State
}
type WebSession struct {
notifications chan *models.State
conn net.Conn
}
func NewWebSessions(notifications chan *models.State) *WebSessions {
websessions := &WebSessions{
sessions: make(map[*WebSession]bool),
}
go func() {
for {
notification := <-notifications
websessions.notifyWebSessions(notification)
}
}()
return websessions
}
func (sessions *WebSessions) notifyWebSessions(notification *models.State) {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
sessions.lastNotification = notification
for session, _ := range sessions.sessions {
session.notifications <- notification
}
}
func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
session := &WebSession{
notifications: make(chan *models.State, 10),
conn: wsConnection,
}
sessions.sessions[session] = true
sessions.logSessions()
session.notifications <- sessions.lastNotification
return session
}
func (session *WebSession) WriteNotifications() {
for {
select {
case notification, ok := <-session.notifications:
if !ok {
log.Println("channel closed")
}
log.Println("Got notification: ", notification.Ascii)
msg := `
<div id="mycontent">
<p>V1 ascii-art to be improved </p>
<pre>` + notification.Ascii + `</pre>
</div>`
_, err := session.conn.Write([]byte(msg))
if err != nil {
log.Printf("WS connection closed: %v", err)
return
}
case <-time.After(10 * time.Second):
_, err := session.conn.Write([]byte("<div>ping</div>"))
if err != nil {
log.Printf("WS connection closed: %v", err)
return
}
}
}
}
func (sessions *WebSessions) SessionClosed(session *WebSession) {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
delete(sessions.sessions, session)
sessions.logSessions()
}
func (sessions *WebSessions) logSessions() {
log.Printf("New web session, nsessions %d", len(sessions.sessions))
}

View File

@ -2,7 +2,7 @@ package templates
templ About() {
<div>
<h1>About</h1>
<h1>about</h1>
<p>
Converge is a utility for troubleshooting builds on continuous integration servers.
It solves a common problem where the cause of job failure is difficult to determine.

View File

@ -4,35 +4,35 @@ package templates
templ Downloads() {
<div>
<h1 id="downloads">Downloads</h1>
<h1>downloads</h1>
<table class="table">
<thead>
<tr>
<th>Component</th>
<th>Purpose</th>
<th>Linux</th>
<th>Windows</th>
<th>Purpose</th>
</tr>
</thead>
<tr>
<td>agent</td>
<td>The agent to run inside aa CI job</td>
<td><a href="../static/agent">agent</a></td>
<td><a href="../static/agent.exe">agent.exe</a></td>
<td>The agent to run inside aa CI job</td>
</tr>
<tr>
<td>wsproxy</td>
<td>SSH proxy command that can be directly used by ssh</td>
<td><a href="../static/wsproxy">wsproxy</a></td>
<td><a href="../static/wsproxy.exe">wsproxy.exe</a></td>
<td>SSH proxy command that can be directly used by ssh</td>
</tr>
<tr>
<td>tcptows</td>
<td>TCP to WS tunnel for allowing regular
SSH and SFTP clients to connect to converge</td>
<td><a href="../static/tcptows">tcptows</a></td>
<td><a href="../static/tcptows.exe">tcptows.exe</a></td>
<td>TCP to WS tunnel for allowing regular
SSH and SFTP clients to connect to converge</td>
</tr>
</table>
</div>

View File

@ -3,6 +3,8 @@ package templates
templ Sessions() {
<div hx-ext="ws" ws-connect="/ws/sessions">
<h1>sessions</h1>
<div id="mycontent">
Initial content
</div>

View File

@ -2,7 +2,7 @@ package templates
templ Usage(secure string, host string, username string) {
<div>
<h1>Usage</h1>
<h1>usage</h1>
<h2>Continuous integration jobs</h2>
@ -56,7 +56,7 @@ templ Usage(secure string, host string, username string) {
<h2>Local clients: using ssh with a proxy command </h2>
<p><code>wsproxy</code> is a command that can be used as a proxy command for SSH which performs the connection to the
remote server. This command needs to be downloaded only once (see <a href="#downloads">downloads</a> below). It does not depend on
remote server. This command needs to be downloaded only once (see <a href="downloads.html">downloads</a> below). It does not depend on
the converge implementation but only on the websocket standards. Other tools that
provide a mapping of stdio to a websocket can also be used instead of wsproxy.
</p>
@ -80,7 +80,7 @@ templ Usage(secure string, host string, username string) {
<p>
Local clients can connect using regular ssh and sftp commands through a tunnel that
translates a local TCP port to a websocket connection in converge. See
the <a href="#downloads">downloads</a> section.
the <a href="downloads.html">downloads</a> section.
This runs a local client that allows SSH to port 10000 and connects to converge using
a websocket connection.
</p>