converge/pkg/server/matchmaker/websessions.go
Erik Brakkee df9409fc75 now a single websocket is opened from the webui to converge.
The browser sends a hello message anytime the user switches to the
sessions page, upon which the server sends the current state back.
THis also improves the timeout handling of broken connecions.
2024-08-12 23:44:07 +02:00

138 lines
3.4 KiB
Go

package matchmaker
import (
"context"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/server/templates"
"log"
"net"
"net/http"
"sync"
"time"
)
type WebSessions struct {
mutex sync.Mutex
sessions map[*WebSession]bool
lastNotification *models.State
}
type WebSession struct {
notifications chan *models.State
conn net.Conn
ctx context.Context
}
func NewWebSessions(notifications chan *models.State) *WebSessions {
websessions := &WebSessions{
sessions: make(map[*WebSession]bool),
}
go func() {
for {
notification := <-notifications
websessions.notifyWebSessions(notification)
}
}()
return websessions
}
func (sessions *WebSessions) notifyWebSessions(notification *models.State) {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
sessions.lastNotification = notification
for session, _ := range sessions.sessions {
select {
case <-session.ctx.Done():
// session is closed, will be removed at higher level when session is done.
case session.notifications <- notification:
// Sent notification
}
}
}
func (sessions *WebSessions) NewSession(wsConnection net.Conn, ctx context.Context) *WebSession {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
session := &WebSession{
notifications: make(chan *models.State, 10),
conn: wsConnection,
ctx: ctx,
}
go func() {
for {
// the web app opens one websocket connection and sends a hello
// message asking for the latest state when a page is loaded that requires this.
// Minor issue at this time is that a notification is sent twice to the client.
p := make([]byte, 1024)
_, err := wsConnection.Read(p)
if err == nil {
session.notifications <- sessions.lastNotification
} else {
log.Printf("Got error reading %v", err)
return
}
}
}()
sessions.sessions[session] = true
sessions.logSessions()
return session
}
func GetUserLocation(r *http.Request) (*time.Location, error) {
tzName := r.URL.Query().Get("timezone")
if tzName == "" {
tzName = r.Header.Get("X-Timezone")
}
if tzName == "" {
tzName = "UTC"
}
return time.LoadLocation(tzName)
}
func (session *WebSession) WriteNotifications(location *time.Location, cancel context.CancelFunc) {
timer := time.NewTicker(10 * time.Second)
defer timer.Stop()
// if for some reason we cannot send notifications to the web client then the context is canceled.
defer cancel()
for {
select {
case notification, ok := <-session.notifications:
if !ok {
log.Println("channel closed")
return
}
if session.writeNotificationToClient(location, notification) {
return
}
case <-timer.C:
_, err := session.conn.Write(make([]byte, 0, 0))
if err != nil {
log.Printf("WS connection closed: %v", err)
return
}
}
}
}
func (session *WebSession) writeNotificationToClient(location *time.Location, notification *models.State) bool {
err := templates.State(notification, location).Render(context.Background(), session.conn)
if err != nil {
log.Printf("WS connection closed: %v", err)
return true
}
return false
}
func (sessions *WebSessions) SessionClosed(session *WebSession) {
sessions.mutex.Lock()
defer sessions.mutex.Unlock()
delete(sessions.sessions, session)
sessions.logSessions()
}
func (sessions *WebSessions) logSessions() {
log.Printf("Web session count %d", len(sessions.sessions))
}