From c3e77e6ef375e783e3b8345ad5866ee2753e032f Mon Sep 17 00:00:00 2001
From: Erik Brakkee <Erik.Brakkee@vanderlande.com>
Date: Thu, 26 Sep 2024 19:14:27 +0200
Subject: [PATCH] Fixed issue with throttling go routine not terminating.

---
 cmd/converge/notifier.go                       |  4 +++-
 pkg/server/ui/websessions.go                   |  4 ++--
 pkg/support/throttling/async_throttler.go      | 11 ++---------
 pkg/support/throttling/async_throttler_test.go | 11 +++++++----
 4 files changed, 14 insertions(+), 16 deletions(-)

diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go
index 168b494..48dfcd0 100644
--- a/cmd/converge/notifier.go
+++ b/cmd/converge/notifier.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"git.wamblee.org/converge/pkg/models"
 	"git.wamblee.org/converge/pkg/support/throttling"
 	"time"
@@ -17,7 +18,8 @@ func NewStateNotifier(minDelay time.Duration) *StateNotifier {
 		webNotificationChannel:        make(chan *models.State),
 		prometheusNotificationChannel: make(chan *models.State),
 	}
-	notifier.throttler = throttling.NewAsyncThrottler(func(state *models.State) {
+	// Runs indefinitely, context does not need to be canceled.
+	notifier.throttler = throttling.NewAsyncThrottler(context.Background(), func(state *models.State) {
 		notifier.webNotificationChannel <- state
 		notifier.prometheusNotificationChannel <- state
 	}, minDelay, 1*time.Second)
diff --git a/pkg/server/ui/websessions.go b/pkg/server/ui/websessions.go
index 94751c7..2d46120 100644
--- a/pkg/server/ui/websessions.go
+++ b/pkg/server/ui/websessions.go
@@ -84,9 +84,10 @@ func (sessions *WebSessions) NewSession(ctx context.Context, cancel context.Canc
 	}()
 
 	go func() {
-		throttler := throttling.NewAsyncThrottler(func(state *models.State) {
+		throttler := throttling.NewAsyncThrottler(ctx, func(state *models.State) {
 			session.notifications <- state
 		}, 500*time.Millisecond, 500*time.Millisecond)
+		defer cancel()
 		for {
 			// the web app opens one websocket connection and sends a hello
 			// message asking for the latest state when a page is loaded that requires this.
@@ -97,7 +98,6 @@ func (sessions *WebSessions) NewSession(ctx context.Context, cancel context.Canc
 				throttler.Notify(sessions.lastNotification)
 			} else {
 				log.Printf("Got error reading %v", err)
-				cancel()
 				return
 			}
 		}
diff --git a/pkg/support/throttling/async_throttler.go b/pkg/support/throttling/async_throttler.go
index f9c6dc3..214b826 100644
--- a/pkg/support/throttling/async_throttler.go
+++ b/pkg/support/throttling/async_throttler.go
@@ -8,23 +8,21 @@ import (
 type AsyncThrottler[T any] struct {
 	throttler Throttler[T]
 	ctx       context.Context
-	cancel    context.CancelFunc
 	events    chan *T
 	ticker    *time.Ticker
 }
 
-func NewAsyncThrottler[T any](notifier func(t *T),
+func NewAsyncThrottler[T any](ctx context.Context, notifier func(t *T),
 	minDelay time.Duration,
 	pollInterval time.Duration) *AsyncThrottler[T] {
-	ctx, cancel := context.WithCancel(context.Background())
 	throttler := AsyncThrottler[T]{
 		throttler: NewThrottler[T](notifier, minDelay),
 		ctx:       ctx,
-		cancel:    cancel,
 		events:    make(chan *T),
 		ticker:    time.NewTicker(pollInterval),
 	}
 	go func() {
+		defer throttler.ticker.Stop()
 		for {
 			select {
 			case <-ctx.Done():
@@ -42,8 +40,3 @@ func NewAsyncThrottler[T any](notifier func(t *T),
 func (throttler *AsyncThrottler[T]) Notify(value *T) {
 	throttler.events <- value
 }
-
-func (throttler *AsyncThrottler[T]) Stop() {
-	throttler.cancel()
-	throttler.ticker.Stop()
-}
diff --git a/pkg/support/throttling/async_throttler_test.go b/pkg/support/throttling/async_throttler_test.go
index ab60034..ab1d358 100644
--- a/pkg/support/throttling/async_throttler_test.go
+++ b/pkg/support/throttling/async_throttler_test.go
@@ -1,6 +1,7 @@
 package throttling
 
 import (
+	"context"
 	"time"
 )
 
@@ -9,10 +10,11 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() {
 
 	pollInterval := 10 * time.Millisecond
 
-	throttler := NewAsyncThrottler[int](func(v *int) {
+	ctx, cancel := context.WithCancel(context.Background())
+	throttler := NewAsyncThrottler[int](ctx, func(v *int) {
 		value = *v
 	}, time.Second, pollInterval)
-	defer throttler.Stop()
+	defer cancel()
 
 	t0 := time.Now()
 	v := 1
@@ -33,10 +35,11 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon
 
 	pollInterval := 10 * time.Millisecond
 
-	throttler := NewAsyncThrottler[int](func(v *int) {
+	ctx, cancel := context.WithCancel(context.Background())
+	throttler := NewAsyncThrottler[int](ctx, func(v *int) {
 		value = *v
 	}, time.Second, pollInterval)
-	defer throttler.Stop()
+	defer cancel()
 
 	t0 := time.Now()
 	v1 := 1