converge/pkg/server/ui/websessions.go
Erik Brakkee 57c77e7d07 initial state is now sent again when the websocket connection is
established.
Also throttling based on user input. When browser sends multiple
messages per second the user will still only get one notification per
second at most.
2024-08-18 11:44:16 +02:00

159 lines
4.0 KiB
Go

package ui
import (
"context"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/support/throttling"
"log"
"net"
"net/http"
"sync"
"time"
)
type WebSessions struct {
mutex sync.Mutex
sessions map[*WebSession]bool
lastNotification *models.State
}
type WebSession struct {
notifications chan *models.State
conn net.Conn
ctx context.Context
}
func NewWebSessions(notifications chan *models.State) *WebSessions {
websessions := &WebSessions{
sessions: make(map[*WebSession]bool),
lastNotification: models.NewState(),
}
go func() {
for {
notification := <-notifications
websessions.notifyWebSessions(notification)
}
}()
return websessions
}
func logRequestMetadata(r *http.Request) {
log.Printf("Method: %s", r.Method)
log.Printf("URL: %s", r.URL)
log.Printf("Protocol: %s", r.Proto)
log.Printf("Host: %s", r.Host)
log.Printf("RemoteAddr: %s", r.RemoteAddr)
log.Printf("RequestURI: %s", r.RequestURI)
log.Printf("Headers: %v", r.Header)
// Log query parameters
log.Printf("Query parameters: %v", r.URL.Query())
// Log form data (if applicable)
r.ParseForm()
log.Printf("Form data: %v", r.Form)
}
func (sessions *WebSessions) notifyWebSessions(notification *models.State) {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
sessions.lastNotification = notification
for session, _ := range sessions.sessions {
select {
case <-session.ctx.Done():
// session is closed, will be removed at higher level when session is done.
case session.notifications <- notification:
// Sent notification
}
}
}
func (sessions *WebSessions) NewSession(wsConnection net.Conn, ctx context.Context,
cancel context.CancelFunc) *WebSession {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
session := &WebSession{
notifications: make(chan *models.State),
conn: wsConnection,
ctx: ctx,
}
// initial notification as soon as client connects.
go func() {
session.notifications <- sessions.lastNotification
}()
go func() {
throttler := throttling.NewAsyncThrottler(func(state *models.State) {
session.notifications <- state
}, time.Second, time.Second)
for {
// the web app opens one websocket connection and sends a hello
// message asking for the latest state when a page is loaded that requires this.
// Minor issue at this time is that a notification is sent twice to the client.
p := make([]byte, 1024)
_, err := wsConnection.Read(p)
if err == nil {
throttler.Notify(sessions.lastNotification)
} else {
log.Printf("Got error reading %v", err)
cancel()
return
}
}
}()
sessions.sessions[session] = true
sessions.logSessions()
return session
}
func (session *WebSession) WriteNotifications(location *time.Location, ctx context.Context, cancel context.CancelFunc) {
timer := time.NewTicker(10 * time.Second)
defer timer.Stop()
// if for some reason we cannot send notifications to the web client then the context is canceled.
defer cancel()
for {
select {
case <-ctx.Done():
return
case notification, ok := <-session.notifications:
if !ok {
log.Println("channel closed")
return
}
if !session.writeNotificationToClient(location, notification) {
return
}
case <-timer.C:
_, err := session.conn.Write(make([]byte, 0, 0))
if err != nil {
log.Printf("WS connection closed: %v", err)
return
}
}
}
}
func (session *WebSession) writeNotificationToClient(location *time.Location, notification *models.State) bool {
agents, clients := notification.Slices()
err := State(agents, clients, location).Render(context.Background(), session.conn)
if err != nil {
log.Printf("WS connection closed: %v", err)
return false
}
return true
}
func (sessions *WebSessions) SessionClosed(session *WebSession) {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
delete(sessions.sessions, session)
sessions.logSessions()
}
func (sessions *WebSessions) logSessions() {
log.Printf("Web session count %d", len(sessions.sessions))
}