diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 168b494..48dfcd0 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -1,6 +1,7 @@ package main import ( + "context" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/support/throttling" "time" @@ -17,7 +18,8 @@ func NewStateNotifier(minDelay time.Duration) *StateNotifier { webNotificationChannel: make(chan *models.State), prometheusNotificationChannel: make(chan *models.State), } - notifier.throttler = throttling.NewAsyncThrottler(func(state *models.State) { + // Runs indefinitely, context does not need to be canceled. + notifier.throttler = throttling.NewAsyncThrottler(context.Background(), func(state *models.State) { notifier.webNotificationChannel <- state notifier.prometheusNotificationChannel <- state }, minDelay, 1*time.Second) diff --git a/pkg/server/ui/websessions.go b/pkg/server/ui/websessions.go index 94751c7..2d46120 100644 --- a/pkg/server/ui/websessions.go +++ b/pkg/server/ui/websessions.go @@ -84,9 +84,10 @@ func (sessions *WebSessions) NewSession(ctx context.Context, cancel context.Canc }() go func() { - throttler := throttling.NewAsyncThrottler(func(state *models.State) { + throttler := throttling.NewAsyncThrottler(ctx, func(state *models.State) { session.notifications <- state }, 500*time.Millisecond, 500*time.Millisecond) + defer cancel() for { // the web app opens one websocket connection and sends a hello // message asking for the latest state when a page is loaded that requires this. @@ -97,7 +98,6 @@ func (sessions *WebSessions) NewSession(ctx context.Context, cancel context.Canc throttler.Notify(sessions.lastNotification) } else { log.Printf("Got error reading %v", err) - cancel() return } } diff --git a/pkg/support/throttling/async_throttler.go b/pkg/support/throttling/async_throttler.go index f9c6dc3..214b826 100644 --- a/pkg/support/throttling/async_throttler.go +++ b/pkg/support/throttling/async_throttler.go @@ -8,23 +8,21 @@ import ( type AsyncThrottler[T any] struct { throttler Throttler[T] ctx context.Context - cancel context.CancelFunc events chan *T ticker *time.Ticker } -func NewAsyncThrottler[T any](notifier func(t *T), +func NewAsyncThrottler[T any](ctx context.Context, notifier func(t *T), minDelay time.Duration, pollInterval time.Duration) *AsyncThrottler[T] { - ctx, cancel := context.WithCancel(context.Background()) throttler := AsyncThrottler[T]{ throttler: NewThrottler[T](notifier, minDelay), ctx: ctx, - cancel: cancel, events: make(chan *T), ticker: time.NewTicker(pollInterval), } go func() { + defer throttler.ticker.Stop() for { select { case <-ctx.Done(): @@ -42,8 +40,3 @@ func NewAsyncThrottler[T any](notifier func(t *T), func (throttler *AsyncThrottler[T]) Notify(value *T) { throttler.events <- value } - -func (throttler *AsyncThrottler[T]) Stop() { - throttler.cancel() - throttler.ticker.Stop() -} diff --git a/pkg/support/throttling/async_throttler_test.go b/pkg/support/throttling/async_throttler_test.go index ab60034..ab1d358 100644 --- a/pkg/support/throttling/async_throttler_test.go +++ b/pkg/support/throttling/async_throttler_test.go @@ -1,6 +1,7 @@ package throttling import ( + "context" "time" ) @@ -9,10 +10,11 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() { pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v *int) { + ctx, cancel := context.WithCancel(context.Background()) + throttler := NewAsyncThrottler[int](ctx, func(v *int) { value = *v }, time.Second, pollInterval) - defer throttler.Stop() + defer cancel() t0 := time.Now() v := 1 @@ -33,10 +35,11 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v *int) { + ctx, cancel := context.WithCancel(context.Background()) + throttler := NewAsyncThrottler[int](ctx, func(v *int) { value = *v }, time.Second, pollInterval) - defer throttler.Stop() + defer cancel() t0 := time.Now() v1 := 1