package matchmaker import ( "context" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/server/templates" "log" "net" "net/http" "sync" "time" ) type WebSessions struct { mutex sync.Mutex sessions map[*WebSession]bool lastNotification *models.State } type WebSession struct { notifications chan *models.State conn net.Conn ctx context.Context } func NewWebSessions(notifications chan *models.State) *WebSessions { websessions := &WebSessions{ sessions: make(map[*WebSession]bool), } go func() { for { notification := <-notifications websessions.notifyWebSessions(notification) } }() return websessions } func (sessions *WebSessions) notifyWebSessions(notification *models.State) { sessions.mutex.Lock() defer sessions.mutex.Unlock() sessions.lastNotification = notification for session, _ := range sessions.sessions { select { case <-session.ctx.Done(): // session is closed, will be removed at higher level when session is done. case session.notifications <- notification: // Sent notification } } } func (sessions *WebSessions) NewSession(wsConnection net.Conn, ctx context.Context, cancel context.CancelFunc) *WebSession { sessions.mutex.Lock() defer sessions.mutex.Unlock() session := &WebSession{ notifications: make(chan *models.State, 10), conn: wsConnection, ctx: ctx, } go func() { for { // the web app opens one websocket connection and sends a hello // message asking for the latest state when a page is loaded that requires this. // Minor issue at this time is that a notification is sent twice to the client. p := make([]byte, 1024) _, err := wsConnection.Read(p) if err == nil { session.notifications <- sessions.lastNotification } else { log.Printf("Got error reading %v", err) cancel() return } } }() sessions.sessions[session] = true sessions.logSessions() return session } func GetUserLocation(r *http.Request) (*time.Location, error) { tzName := r.URL.Query().Get("timezone") if tzName == "" { tzName = r.Header.Get("X-Timezone") } //log.Printf("Got timezone from request %v %v", tzName, r.URL.Path) if tzName == "" { tzName = "UTC" } return time.LoadLocation(tzName) } func (session *WebSession) WriteNotifications(location *time.Location, ctx context.Context, cancel context.CancelFunc) { timer := time.NewTicker(10 * time.Second) defer timer.Stop() // if for some reason we cannot send notifications to the web client then the context is canceled. defer cancel() for { select { case <-ctx.Done(): return case notification, ok := <-session.notifications: if !ok { log.Println("channel closed") return } if session.writeNotificationToClient(location, notification) { return } case <-timer.C: _, err := session.conn.Write(make([]byte, 0, 0)) if err != nil { log.Printf("WS connection closed: %v", err) return } } } } func (session *WebSession) writeNotificationToClient(location *time.Location, notification *models.State) bool { err := templates.State(notification, location).Render(context.Background(), session.conn) if err != nil { log.Printf("WS connection closed: %v", err) return true } return false } func (sessions *WebSessions) SessionClosed(session *WebSession) { sessions.mutex.Lock() defer sessions.mutex.Unlock() delete(sessions.sessions, session) sessions.logSessions() } func (sessions *WebSessions) logSessions() { log.Printf("Web session count %d", len(sessions.sessions)) }