Fixed issue with throttling go routine not terminating.

This commit is contained in:
Erik Brakkee 2024-09-26 19:14:27 +02:00
parent 911dd40858
commit c3e77e6ef3
4 changed files with 14 additions and 16 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/support/throttling" "git.wamblee.org/converge/pkg/support/throttling"
"time" "time"
@ -17,7 +18,8 @@ func NewStateNotifier(minDelay time.Duration) *StateNotifier {
webNotificationChannel: make(chan *models.State), webNotificationChannel: make(chan *models.State),
prometheusNotificationChannel: make(chan *models.State), prometheusNotificationChannel: make(chan *models.State),
} }
notifier.throttler = throttling.NewAsyncThrottler(func(state *models.State) { // Runs indefinitely, context does not need to be canceled.
notifier.throttler = throttling.NewAsyncThrottler(context.Background(), func(state *models.State) {
notifier.webNotificationChannel <- state notifier.webNotificationChannel <- state
notifier.prometheusNotificationChannel <- state notifier.prometheusNotificationChannel <- state
}, minDelay, 1*time.Second) }, minDelay, 1*time.Second)

View File

@ -84,9 +84,10 @@ func (sessions *WebSessions) NewSession(ctx context.Context, cancel context.Canc
}() }()
go func() { go func() {
throttler := throttling.NewAsyncThrottler(func(state *models.State) { throttler := throttling.NewAsyncThrottler(ctx, func(state *models.State) {
session.notifications <- state session.notifications <- state
}, 500*time.Millisecond, 500*time.Millisecond) }, 500*time.Millisecond, 500*time.Millisecond)
defer cancel()
for { for {
// the web app opens one websocket connection and sends a hello // the web app opens one websocket connection and sends a hello
// message asking for the latest state when a page is loaded that requires this. // message asking for the latest state when a page is loaded that requires this.
@ -97,7 +98,6 @@ func (sessions *WebSessions) NewSession(ctx context.Context, cancel context.Canc
throttler.Notify(sessions.lastNotification) throttler.Notify(sessions.lastNotification)
} else { } else {
log.Printf("Got error reading %v", err) log.Printf("Got error reading %v", err)
cancel()
return return
} }
} }

View File

@ -8,23 +8,21 @@ import (
type AsyncThrottler[T any] struct { type AsyncThrottler[T any] struct {
throttler Throttler[T] throttler Throttler[T]
ctx context.Context ctx context.Context
cancel context.CancelFunc
events chan *T events chan *T
ticker *time.Ticker ticker *time.Ticker
} }
func NewAsyncThrottler[T any](notifier func(t *T), func NewAsyncThrottler[T any](ctx context.Context, notifier func(t *T),
minDelay time.Duration, minDelay time.Duration,
pollInterval time.Duration) *AsyncThrottler[T] { pollInterval time.Duration) *AsyncThrottler[T] {
ctx, cancel := context.WithCancel(context.Background())
throttler := AsyncThrottler[T]{ throttler := AsyncThrottler[T]{
throttler: NewThrottler[T](notifier, minDelay), throttler: NewThrottler[T](notifier, minDelay),
ctx: ctx, ctx: ctx,
cancel: cancel,
events: make(chan *T), events: make(chan *T),
ticker: time.NewTicker(pollInterval), ticker: time.NewTicker(pollInterval),
} }
go func() { go func() {
defer throttler.ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -42,8 +40,3 @@ func NewAsyncThrottler[T any](notifier func(t *T),
func (throttler *AsyncThrottler[T]) Notify(value *T) { func (throttler *AsyncThrottler[T]) Notify(value *T) {
throttler.events <- value throttler.events <- value
} }
func (throttler *AsyncThrottler[T]) Stop() {
throttler.cancel()
throttler.ticker.Stop()
}

View File

@ -1,6 +1,7 @@
package throttling package throttling
import ( import (
"context"
"time" "time"
) )
@ -9,10 +10,11 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() {
pollInterval := 10 * time.Millisecond pollInterval := 10 * time.Millisecond
throttler := NewAsyncThrottler[int](func(v *int) { ctx, cancel := context.WithCancel(context.Background())
throttler := NewAsyncThrottler[int](ctx, func(v *int) {
value = *v value = *v
}, time.Second, pollInterval) }, time.Second, pollInterval)
defer throttler.Stop() defer cancel()
t0 := time.Now() t0 := time.Now()
v := 1 v := 1
@ -33,10 +35,11 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon
pollInterval := 10 * time.Millisecond pollInterval := 10 * time.Millisecond
throttler := NewAsyncThrottler[int](func(v *int) { ctx, cancel := context.WithCancel(context.Background())
throttler := NewAsyncThrottler[int](ctx, func(v *int) {
value = *v value = *v
}, time.Second, pollInterval) }, time.Second, pollInterval)
defer throttler.Stop() defer cancel()
t0 := time.Now() t0 := time.Now()
v1 := 1 v1 := 1