Live updates of the sessions.
V1 in ascii-art. To be improved.
This commit is contained in:
parent
f382c02b41
commit
9a3618f06b
@ -199,7 +199,7 @@ func main() {
|
||||
|
||||
pprofPort, ok := os.LookupEnv("PPROF_PORT")
|
||||
if ok {
|
||||
log.Printf("Enablilng pprof on localhost:%s", pprofPort)
|
||||
log.Printf("Enabllng pprof on localhost:%s", pprofPort)
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe("localhost:"+pprofPort, nil))
|
||||
}()
|
||||
|
@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"converge/pkg/comms"
|
||||
"converge/pkg/models"
|
||||
"converge/pkg/server/converge"
|
||||
"converge/pkg/support/websocketutil"
|
||||
"fmt"
|
||||
@ -95,7 +96,9 @@ func main() {
|
||||
|
||||
log.Printf("Using username '%s' and password '%s'", userPassword.Username, userPassword.Password)
|
||||
|
||||
admin := converge.NewAdmin()
|
||||
notifications := make(chan *models.State, 10)
|
||||
admin := converge.NewAdmin(notifications)
|
||||
websessions := converge.NewWebSessions(notifications)
|
||||
|
||||
// For agents connecting
|
||||
registrationService := websocketutil.WebSocketService{
|
||||
@ -132,8 +135,12 @@ func main() {
|
||||
|
||||
// for the web browser getting live status updates.
|
||||
sessionService := websocketutil.WebSocketService{
|
||||
Handler: sessionHandler,
|
||||
Text: true,
|
||||
Handler: func(w http.ResponseWriter, r *http.Request, conn net.Conn) {
|
||||
websession := websessions.NewSession(conn)
|
||||
defer websessions.SessionClosed(websession)
|
||||
websession.WriteNotifications()
|
||||
},
|
||||
Text: true,
|
||||
}
|
||||
|
||||
// websocket endpoints
|
||||
|
@ -1,7 +1,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -16,33 +15,25 @@ type Message struct {
|
||||
|
||||
func sessionHandler(w http.ResponseWriter, r *http.Request, conn net.Conn) {
|
||||
log.Println("Got sessions websocket connection")
|
||||
go func() {
|
||||
for {
|
||||
b := make([]byte, 1024)
|
||||
log.Printf("Reading from %v", conn)
|
||||
_, err := conn.Read(b)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
i := 0
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
message := Message{
|
||||
Type: "update",
|
||||
Content: `
|
||||
message := `
|
||||
<div id="mycontent">
|
||||
New data: ` + strconv.Itoa(i) + `
|
||||
</div>
|
||||
`,
|
||||
}
|
||||
_, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
log.Printf("ERROR marshalling json: %v", err)
|
||||
return
|
||||
}
|
||||
//conn.Write(data)
|
||||
go func() {
|
||||
for {
|
||||
b := make([]byte, 1024)
|
||||
_, err := conn.Read(b)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
_, err = conn.Write([]byte(message.Content))
|
||||
`
|
||||
_, err := conn.Write([]byte(message))
|
||||
if err == nil {
|
||||
_, err = conn.Write([]byte("\n"))
|
||||
}
|
||||
|
7
pkg/models/state.go
Normal file
7
pkg/models/state.go
Normal file
@ -0,0 +1,7 @@
|
||||
package models
|
||||
|
||||
type State struct {
|
||||
Agents []Agent
|
||||
Clients []Client
|
||||
Ascii string
|
||||
}
|
@ -10,6 +10,7 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@ -54,45 +55,71 @@ func NewClient(publicId string, clientConn iowrappers2.ReadWriteAddrCloser,
|
||||
|
||||
type Admin struct {
|
||||
// map of public id to agent
|
||||
mutex sync.Mutex
|
||||
agents map[string]*AgentConnection
|
||||
clients []*ClientConnection
|
||||
mutex sync.Mutex
|
||||
agents map[string]*AgentConnection
|
||||
clients []*ClientConnection
|
||||
notifications chan *models.State
|
||||
}
|
||||
|
||||
func NewAdmin() *Admin {
|
||||
func NewAdmin(notifications chan *models.State) *Admin {
|
||||
admin := Admin{
|
||||
mutex: sync.Mutex{},
|
||||
agents: make(map[string]*AgentConnection),
|
||||
clients: make([]*ClientConnection, 0), // not strictly needed
|
||||
mutex: sync.Mutex{},
|
||||
agents: make(map[string]*AgentConnection),
|
||||
clients: make([]*ClientConnection, 0), // not strictly needed
|
||||
notifications: notifications,
|
||||
}
|
||||
admin.logStatus()
|
||||
return &admin
|
||||
}
|
||||
|
||||
func (admin *Admin) createNotification() *models.State {
|
||||
state := models.State{}
|
||||
state.Agents = make([]models.Agent, 0, len(admin.agents))
|
||||
state.Clients = make([]models.Client, 0, len(admin.clients))
|
||||
for _, agent := range admin.agents {
|
||||
state.Agents = append(state.Agents, agent.Agent)
|
||||
}
|
||||
for _, client := range admin.clients {
|
||||
state.Clients = append(state.Clients, client.Client)
|
||||
}
|
||||
return &state
|
||||
}
|
||||
|
||||
func (admin *Admin) logStatus() {
|
||||
fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n"
|
||||
log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
|
||||
"USER", "HOST", "OS")
|
||||
format := "%-20s %-20s %-20s %-10s %-15s %-20s"
|
||||
|
||||
lines := make([]string, 0, 100)
|
||||
|
||||
lines = append(lines, fmt.Sprintf(format, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
|
||||
"USER", "HOST", "OS"))
|
||||
for _, agent := range admin.agents {
|
||||
agent.commChannel.Session.RemoteAddr()
|
||||
log.Printf(fmt, agent.PublicId,
|
||||
lines = append(lines, fmt.Sprintf(format, agent.PublicId,
|
||||
agent.StartTime.Format(time.DateTime),
|
||||
agent.ExpiryTime.Format(time.DateTime),
|
||||
agent.AgentInfo.Username,
|
||||
agent.AgentInfo.Hostname,
|
||||
agent.AgentInfo.OS)
|
||||
agent.AgentInfo.OS))
|
||||
}
|
||||
log.Println("")
|
||||
fmt = "%-10s %-20s %-20s %-20s %-20s\n"
|
||||
log.Printf(fmt, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")
|
||||
lines = append(lines, "")
|
||||
format = "%-10s %-20s %-20s %-20s %-20s"
|
||||
lines = append(lines, fmt.Sprintf(format, "CLIENT", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE"))
|
||||
for _, client := range admin.clients {
|
||||
log.Printf(fmt,
|
||||
lines = append(lines, fmt.Sprintf(format,
|
||||
strconv.Itoa(client.ClientId),
|
||||
client.PublicId,
|
||||
client.StartTime.Format(time.DateTime),
|
||||
client.client.RemoteAddr(),
|
||||
client.SessionType)
|
||||
client.SessionType))
|
||||
}
|
||||
log.Printf("\n")
|
||||
lines = append(lines, "")
|
||||
for _, line := range lines {
|
||||
log.Println(line)
|
||||
}
|
||||
|
||||
notification := admin.createNotification()
|
||||
notification.Ascii = strings.Join(lines, "\n")
|
||||
admin.notifications <- notification
|
||||
}
|
||||
|
||||
func (admin *Admin) getFreeId(publicId string) (string, error) {
|
||||
|
96
pkg/server/converge/websessions.go
Normal file
96
pkg/server/converge/websessions.go
Normal file
@ -0,0 +1,96 @@
|
||||
package converge
|
||||
|
||||
import (
|
||||
"converge/pkg/models"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type WebSessions struct {
|
||||
mutex sync.Mutex
|
||||
sessions map[*WebSession]bool
|
||||
lastNotification *models.State
|
||||
}
|
||||
|
||||
type WebSession struct {
|
||||
notifications chan *models.State
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
func NewWebSessions(notifications chan *models.State) *WebSessions {
|
||||
websessions := &WebSessions{
|
||||
sessions: make(map[*WebSession]bool),
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
notification := <-notifications
|
||||
websessions.notifyWebSessions(notification)
|
||||
}
|
||||
}()
|
||||
return websessions
|
||||
}
|
||||
|
||||
func (sessions *WebSessions) notifyWebSessions(notification *models.State) {
|
||||
sessions.mutex.Lock()
|
||||
defer sessions.mutex.Unlock()
|
||||
sessions.lastNotification = notification
|
||||
for session, _ := range sessions.sessions {
|
||||
session.notifications <- notification
|
||||
}
|
||||
}
|
||||
|
||||
func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession {
|
||||
sessions.mutex.Lock()
|
||||
defer sessions.mutex.Unlock()
|
||||
session := &WebSession{
|
||||
notifications: make(chan *models.State, 10),
|
||||
conn: wsConnection,
|
||||
}
|
||||
sessions.sessions[session] = true
|
||||
sessions.logSessions()
|
||||
session.notifications <- sessions.lastNotification
|
||||
return session
|
||||
}
|
||||
|
||||
func (session *WebSession) WriteNotifications() {
|
||||
for {
|
||||
select {
|
||||
case notification, ok := <-session.notifications:
|
||||
if !ok {
|
||||
log.Println("channel closed")
|
||||
}
|
||||
log.Println("Got notification: ", notification.Ascii)
|
||||
msg := `
|
||||
<div id="mycontent">
|
||||
<p>V1 ascii-art to be improved </p>
|
||||
<pre>` + notification.Ascii + `</pre>
|
||||
</div>`
|
||||
_, err := session.conn.Write([]byte(msg))
|
||||
if err != nil {
|
||||
log.Printf("WS connection closed: %v", err)
|
||||
return
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
_, err := session.conn.Write([]byte("<div>ping</div>"))
|
||||
if err != nil {
|
||||
log.Printf("WS connection closed: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sessions *WebSessions) SessionClosed(session *WebSession) {
|
||||
sessions.mutex.Lock()
|
||||
defer sessions.mutex.Unlock()
|
||||
|
||||
delete(sessions.sessions, session)
|
||||
sessions.logSessions()
|
||||
}
|
||||
|
||||
func (sessions *WebSessions) logSessions() {
|
||||
log.Printf("New web session, nsessions %d", len(sessions.sessions))
|
||||
}
|
@ -2,7 +2,7 @@ package templates
|
||||
|
||||
templ About() {
|
||||
<div>
|
||||
<h1>About</h1>
|
||||
<h1>about</h1>
|
||||
<p>
|
||||
Converge is a utility for troubleshooting builds on continuous integration servers.
|
||||
It solves a common problem where the cause of job failure is difficult to determine.
|
||||
|
@ -4,35 +4,35 @@ package templates
|
||||
templ Downloads() {
|
||||
<div>
|
||||
|
||||
<h1 id="downloads">Downloads</h1>
|
||||
<h1>downloads</h1>
|
||||
|
||||
<table class="table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Component</th>
|
||||
<th>Purpose</th>
|
||||
<th>Linux</th>
|
||||
<th>Windows</th>
|
||||
<th>Purpose</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tr>
|
||||
<td>agent</td>
|
||||
<td>The agent to run inside aa CI job</td>
|
||||
<td><a href="../static/agent">agent</a></td>
|
||||
<td><a href="../static/agent.exe">agent.exe</a></td>
|
||||
<td>The agent to run inside aa CI job</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>wsproxy</td>
|
||||
<td>SSH proxy command that can be directly used by ssh</td>
|
||||
<td><a href="../static/wsproxy">wsproxy</a></td>
|
||||
<td><a href="../static/wsproxy.exe">wsproxy.exe</a></td>
|
||||
<td>SSH proxy command that can be directly used by ssh</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>tcptows</td>
|
||||
<td>TCP to WS tunnel for allowing regular
|
||||
SSH and SFTP clients to connect to converge</td>
|
||||
<td><a href="../static/tcptows">tcptows</a></td>
|
||||
<td><a href="../static/tcptows.exe">tcptows.exe</a></td>
|
||||
<td>TCP to WS tunnel for allowing regular
|
||||
SSH and SFTP clients to connect to converge</td>
|
||||
</tr>
|
||||
</table>
|
||||
</div>
|
||||
|
@ -3,6 +3,8 @@ package templates
|
||||
|
||||
templ Sessions() {
|
||||
<div hx-ext="ws" ws-connect="/ws/sessions">
|
||||
<h1>sessions</h1>
|
||||
|
||||
<div id="mycontent">
|
||||
Initial content
|
||||
</div>
|
||||
|
@ -2,7 +2,7 @@ package templates
|
||||
|
||||
templ Usage(secure string, host string, username string) {
|
||||
<div>
|
||||
<h1>Usage</h1>
|
||||
<h1>usage</h1>
|
||||
|
||||
<h2>Continuous integration jobs</h2>
|
||||
|
||||
@ -56,7 +56,7 @@ templ Usage(secure string, host string, username string) {
|
||||
<h2>Local clients: using ssh with a proxy command </h2>
|
||||
|
||||
<p><code>wsproxy</code> is a command that can be used as a proxy command for SSH which performs the connection to the
|
||||
remote server. This command needs to be downloaded only once (see <a href="#downloads">downloads</a> below). It does not depend on
|
||||
remote server. This command needs to be downloaded only once (see <a href="downloads.html">downloads</a> below). It does not depend on
|
||||
the converge implementation but only on the websocket standards. Other tools that
|
||||
provide a mapping of stdio to a websocket can also be used instead of wsproxy.
|
||||
</p>
|
||||
@ -80,7 +80,7 @@ templ Usage(secure string, host string, username string) {
|
||||
<p>
|
||||
Local clients can connect using regular ssh and sftp commands through a tunnel that
|
||||
translates a local TCP port to a websocket connection in converge. See
|
||||
the <a href="#downloads">downloads</a> section.
|
||||
the <a href="downloads.html">downloads</a> section.
|
||||
This runs a local client that allows SSH to port 10000 and connects to converge using
|
||||
a websocket connection.
|
||||
</p>
|
||||
|
Loading…
Reference in New Issue
Block a user