From fd064f249fe602d84f0fa05f4d910feb7c9ef9ff Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Tue, 13 Aug 2024 21:33:29 +0200 Subject: [PATCH] using unbuffered channels everywhere now. Only change required was to initialize prometeus and the websessions before the matchmaker. This is because at startup the matchmaker wants to write a notification when it starts up but then prometheus and the websessions would not be there to read them. Alternative solution would be to run all initialization code in go routines to make it independent of initialization order but having a defined initialization order is much cleaner. --- cmd/converge/converge.go | 22 ++++++++++++++-------- cmd/converge/notifier.go | 4 ++-- cmd/converge/prometheus.go | 2 +- cmd/fsnotifytest/monitor.go | 2 +- pkg/agent/session/session.go | 2 +- pkg/comms/gobchannel.go | 8 ++++---- pkg/server/matchmaker/websessions.go | 2 +- 7 files changed, 24 insertions(+), 18 deletions(-) diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go index a5daa1c..bdc541b 100644 --- a/cmd/converge/converge.go +++ b/cmd/converge/converge.go @@ -58,6 +58,13 @@ func printHelp(msg string) { } func main() { + // for debugging hangs relating to using unbuffered channels + //go func() { + // buf := make([]byte, 1000000) + // n := runtime.Stack(buf, true) + // time.Sleep(10 * time.Second) + // log.Println("LOG: \n" + string(buf[:n])) + //}() downloaddir := "." staticdir := "../static" @@ -96,8 +103,14 @@ func main() { } notifications := NewStateNotifier() - admin := matchmaker.NewMatchMaker(notifications) websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel) + // monitoring + mux := http.NewServeMux() + setupPrometheus(mux, notifications.prometheusNotificationChannel) + go func() { + log.Fatal(http.ListenAndServe(":8001", mux)) + }() + admin := matchmaker.NewMatchMaker(notifications) // For agents connecting registrationService := websocketutil.WebSocketService{ @@ -167,13 +180,6 @@ func main() { http.HandleFunc("/", catchAllHandler(contextpath)) - // monitoring - mux := http.NewServeMux() - setupPrometheus(mux, notifications.prometheusNotificationChannel) - go func() { - log.Fatal(http.ListenAndServe(":8001", mux)) - }() - // Start HTTP server fmt.Println("Rendez-vous server listening on :8000") log.Fatal(http.ListenAndServe(":8000", nil)) diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 008c803..241b804 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -9,8 +9,8 @@ type StateNotifier struct { func NewStateNotifier() *StateNotifier { return &StateNotifier{ - webNotificationChannel: make(chan *models.State, 10), - prometheusNotificationChannel: make(chan *models.State, 10), + webNotificationChannel: make(chan *models.State), + prometheusNotificationChannel: make(chan *models.State), } } diff --git a/cmd/converge/prometheus.go b/cmd/converge/prometheus.go index 9869529..109ea85 100644 --- a/cmd/converge/prometheus.go +++ b/cmd/converge/prometheus.go @@ -191,7 +191,7 @@ func setupPrometheus(mux *http.ServeMux, notifications chan *models.State) { mux.Handle("/metrics", promhttp.Handler()) } -var prometheusChannel = make(chan func(), 10) +var prometheusChannel = make(chan func()) // serialize notifidcations and periodi updates of the duration. diff --git a/cmd/fsnotifytest/monitor.go b/cmd/fsnotifytest/monitor.go index 9b450b4..7d1622b 100644 --- a/cmd/fsnotifytest/monitor.go +++ b/cmd/fsnotifytest/monitor.go @@ -54,5 +54,5 @@ func main() { } // Block main goroutine forever - <-make(chan struct{}, 10) + <-make(chan struct{}) } diff --git a/pkg/agent/session/session.go b/pkg/agent/session/session.go index 4b0d5b2..89b745f 100644 --- a/pkg/agent/session/session.go +++ b/pkg/agent/session/session.go @@ -69,7 +69,7 @@ var helpMessageTemplate string var helpMessage = formatHelpMessage() // Events channel for asynchronous events. -var events = make(chan func(), 10) +var events = make(chan func()) // External interface, asynchronous, apart from the initialization. diff --git a/pkg/comms/gobchannel.go b/pkg/comms/gobchannel.go index 601126d..854112c 100644 --- a/pkg/comms/gobchannel.go +++ b/pkg/comms/gobchannel.go @@ -64,8 +64,8 @@ func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error) } func SendWithTimeout[T any](channel GOBChannel, obj T) error { - done := make(chan bool, 10) - errors := make(chan error, 10) + done := make(chan bool) + errors := make(chan error) SendAsync(channel, obj, done, errors) select { @@ -79,8 +79,8 @@ func SendWithTimeout[T any](channel GOBChannel, obj T) error { } func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) { - result := make(chan T, 10) - errors := make(chan error, 10) + result := make(chan T) + errors := make(chan error) ReceiveAsync(channel, result, errors) select { diff --git a/pkg/server/matchmaker/websessions.go b/pkg/server/matchmaker/websessions.go index c6ec68a..83c4a92 100644 --- a/pkg/server/matchmaker/websessions.go +++ b/pkg/server/matchmaker/websessions.go @@ -73,7 +73,7 @@ func (sessions *WebSessions) NewSession(wsConnection net.Conn, ctx context.Conte sessions.mutex.Lock() defer sessions.mutex.Unlock() session := &WebSession{ - notifications: make(chan *models.State, 10), + notifications: make(chan *models.State), conn: wsConnection, ctx: ctx, }