package ui import ( "context" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/support/throttling" "log" "net" "net/http" "sync" "time" ) type WebSessions struct { mutex sync.Mutex sessions map[*WebSession]bool lastNotification *models.State } type WebSession struct { notifications chan *models.State conn net.Conn ctx context.Context } func NewWebSessions(notifications chan *models.State) *WebSessions { websessions := &WebSessions{ sessions: make(map[*WebSession]bool), lastNotification: models.NewState(), } go func() { for { notification := <-notifications websessions.notifyWebSessions(notification) } }() return websessions } func logRequestMetadata(r *http.Request) { log.Printf("Method: %s", r.Method) log.Printf("URL: %s", r.URL) log.Printf("Protocol: %s", r.Proto) log.Printf("Host: %s", r.Host) log.Printf("RemoteAddr: %s", r.RemoteAddr) log.Printf("RequestURI: %s", r.RequestURI) log.Printf("Headers: %v", r.Header) // Log query parameters log.Printf("Query parameters: %v", r.URL.Query()) // Log form data (if applicable) r.ParseForm() log.Printf("Form data: %v", r.Form) } func (sessions *WebSessions) notifyWebSessions(notification *models.State) { sessions.mutex.Lock() defer sessions.mutex.Unlock() sessions.lastNotification = notification for session, _ := range sessions.sessions { select { case <-session.ctx.Done(): // session is closed, will be removed at higher level when session is done. case session.notifications <- notification: // Sent notification } } } func (sessions *WebSessions) NewSession(wsConnection net.Conn, ctx context.Context, cancel context.CancelFunc) *WebSession { sessions.mutex.Lock() defer sessions.mutex.Unlock() session := &WebSession{ notifications: make(chan *models.State), conn: wsConnection, ctx: ctx, } // initial notification as soon as client connects. go func() { session.notifications <- sessions.lastNotification }() go func() { throttler := throttling.NewAsyncThrottler(func(state *models.State) { session.notifications <- state }, time.Second, time.Second) for { // the web app opens one websocket connection and sends a hello // message asking for the latest state when a page is loaded that requires this. // Minor issue at this time is that a notification is sent twice to the client. p := make([]byte, 1024) _, err := wsConnection.Read(p) if err == nil { throttler.Notify(sessions.lastNotification) } else { log.Printf("Got error reading %v", err) cancel() return } } }() sessions.sessions[session] = true sessions.logSessions() return session } func (session *WebSession) WriteNotifications(location *time.Location, ctx context.Context, cancel context.CancelFunc) { timer := time.NewTicker(10 * time.Second) defer timer.Stop() // if for some reason we cannot send notifications to the web client then the context is canceled. defer cancel() for { select { case <-ctx.Done(): return case notification, ok := <-session.notifications: if !ok { log.Println("channel closed") return } if !session.writeNotificationToClient(location, notification) { return } case <-timer.C: _, err := session.conn.Write(make([]byte, 0, 0)) if err != nil { log.Printf("WS connection closed: %v", err) return } } } } func (session *WebSession) writeNotificationToClient(location *time.Location, notification *models.State) bool { agents, clients := notification.Slices() err := State(agents, clients, location).Render(context.Background(), session.conn) if err != nil { log.Printf("WS connection closed: %v", err) return false } return true } func (sessions *WebSessions) SessionClosed(session *WebSession) { sessions.mutex.Lock() defer sessions.mutex.Unlock() delete(sessions.sessions, session) sessions.logSessions() } func (sessions *WebSessions) logSessions() { log.Printf("Web session count %d", len(sessions.sessions)) }