From f6b02113366231be311bde46164e00a8cd26d0ff Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sat, 24 Aug 2024 22:23:39 +0200 Subject: [PATCH] throttler now ignoring duplicate events. --- Makefile | 2 +- cmd/converge/notifier.go | 2 +- pkg/support/throttling/async_throttler.go | 10 ++--- .../throttling/async_throttler_test.go | 16 +++---- pkg/support/throttling/throtller_test.go | 42 +++++++++++++++---- pkg/support/throttling/throttler.go | 24 +++++++---- 6 files changed, 65 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index 82d42a9..1ae1c95 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ vet: fmt go vet ./... test: build - go test -count=1 -v ./... + go test -count=1 ./... build: generate vet mkdir -p bin diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 168b494..60cab7f 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -7,7 +7,7 @@ import ( ) type StateNotifier struct { - throttler *throttling.AsyncThrottler[models.State] + throttler *throttling.AsyncThrottler[*models.State] webNotificationChannel chan *models.State prometheusNotificationChannel chan *models.State } diff --git a/pkg/support/throttling/async_throttler.go b/pkg/support/throttling/async_throttler.go index f9c6dc3..48d4b94 100644 --- a/pkg/support/throttling/async_throttler.go +++ b/pkg/support/throttling/async_throttler.go @@ -5,15 +5,15 @@ import ( "time" ) -type AsyncThrottler[T any] struct { +type AsyncThrottler[T comparable] struct { throttler Throttler[T] ctx context.Context cancel context.CancelFunc - events chan *T + events chan T ticker *time.Ticker } -func NewAsyncThrottler[T any](notifier func(t *T), +func NewAsyncThrottler[T comparable](notifier func(t T), minDelay time.Duration, pollInterval time.Duration) *AsyncThrottler[T] { ctx, cancel := context.WithCancel(context.Background()) @@ -21,7 +21,7 @@ func NewAsyncThrottler[T any](notifier func(t *T), throttler: NewThrottler[T](notifier, minDelay), ctx: ctx, cancel: cancel, - events: make(chan *T), + events: make(chan T), ticker: time.NewTicker(pollInterval), } go func() { @@ -39,7 +39,7 @@ func NewAsyncThrottler[T any](notifier func(t *T), return &throttler } -func (throttler *AsyncThrottler[T]) Notify(value *T) { +func (throttler *AsyncThrottler[T]) Notify(value T) { throttler.events <- value } diff --git a/pkg/support/throttling/async_throttler_test.go b/pkg/support/throttling/async_throttler_test.go index ab60034..cc59f22 100644 --- a/pkg/support/throttling/async_throttler_test.go +++ b/pkg/support/throttling/async_throttler_test.go @@ -9,15 +9,15 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() { pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v *int) { - value = *v + throttler := NewAsyncThrottler[int](func(v int) { + value = v }, time.Second, pollInterval) defer throttler.Stop() t0 := time.Now() v := 1 currentTime.now = t0 - throttler.Notify(&v) + throttler.Notify(v) time.Sleep(2 * pollInterval) suite.Equal(v, value) @@ -33,8 +33,8 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v *int) { - value = *v + throttler := NewAsyncThrottler[int](func(v int) { + value = v }, time.Second, pollInterval) defer throttler.Stop() @@ -44,12 +44,12 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon v2 := 2 v3 := 3 currentTime.now = t0 - throttler.Notify(&v1) + throttler.Notify(v1) time.Sleep(2 * pollInterval) suite.Equal(v1, value) - throttler.Notify(&v2) - throttler.Notify(&v3) + throttler.Notify(v2) + throttler.Notify(v3) time.Sleep(2 * pollInterval) suite.Equal(v1, value) diff --git a/pkg/support/throttling/throtller_test.go b/pkg/support/throttling/throtller_test.go index c7442a2..cacf244 100644 --- a/pkg/support/throttling/throtller_test.go +++ b/pkg/support/throttling/throtller_test.go @@ -6,14 +6,14 @@ import ( func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitialized() { value := 0 - throttler := NewThrottler[int](func(v *int) { - value = *v + throttler := NewThrottler[int](func(v int) { + value = v }, 1.0) t0 := time.Now() v := 1 currentTime.now = t0 - throttler.Notify(&v) + throttler.Notify(v) suite.Equal(v, value) value = 0 // subsequent ping will not lead to a notification @@ -25,8 +25,8 @@ func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitial func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay() { value := 0 delayMs := 1000 - throttler := NewThrottler[int](func(v *int) { - value = *v + throttler := NewThrottler[int](func(v int) { + value = v }, time.Duration(delayMs)*time.Millisecond) t0 := time.Now() @@ -35,10 +35,10 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI v2 := 2 v3 := 3 currentTime.now = t0 - throttler.Notify(&v1) + throttler.Notify(v1) suite.Equal(v1, value) - throttler.Notify(&v2) - throttler.Notify(&v3) + throttler.Notify(v2) + throttler.Notify(v3) suite.Equal(v1, value) currentTime.now = t0.Add(time.Duration(delayMs-1) * time.Millisecond) @@ -55,3 +55,29 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI throttler.Ping() suite.Equal(0, value) } + +func (suite *ThrottlerTestSuite) Test_DuplicateEventsIgnored() { + value := 0 + mindelay := 1 * time.Second + throttler := NewThrottler[int](func(v int) { + value = v + }, mindelay) + + t0 := time.Now() + v := 1 + currentTime.now = t0 + throttler.Notify(v) + suite.Equal(v, value) + + // sending the same value again will not lead to a notification + value = 0 + currentTime.now = currentTime.now.Add(2 * mindelay) + throttler.Notify(v) + suite.Equal(0, value) + + // sending a different value later will lead to a new notification + currentTime.now = currentTime.now.Add(2 * mindelay) + v++ + throttler.Notify(v) + suite.Equal(v, value) +} diff --git a/pkg/support/throttling/throttler.go b/pkg/support/throttling/throttler.go index a890dc3..f8e94e9 100644 --- a/pkg/support/throttling/throttler.go +++ b/pkg/support/throttling/throttler.go @@ -6,20 +6,22 @@ import ( // Throttling notifications to prometheus and web clients // TO be used in a single-threaded manner. -type Throttler[T any] struct { +type Throttler[T comparable] struct { minDelay time.Duration // ucntion to call to implement the notification. - notifier func(t *T) + notifier func(t T) lastReportedTime time.Time - pendingValue *T + pendingValue T + lastValue T } -func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] { +func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Throttler[T] { throttler := Throttler[T]{ minDelay: minDelay, notifier: notifier, lastReportedTime: time.Time{}, - pendingValue: nil, + pendingValue: *new(T), + lastValue: *new(T), } return throttler } @@ -28,11 +30,16 @@ func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[ // for the last notification to be sent. If not, it is stored as a pending event to // be sent later. New events that come in before a notification is sent override the // pending event. -func (throttler *Throttler[T]) Notify(value *T) { +func (throttler *Throttler[T]) Notify(value T) { + if throttler.lastValue == value { + // duplicate events are not sent. + return + } if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay { throttler.notifier(value) throttler.lastReportedTime = clock.time() - throttler.pendingValue = nil + throttler.lastValue = value + throttler.pendingValue = *new(T) return } throttler.pendingValue = value @@ -41,7 +48,8 @@ func (throttler *Throttler[T]) Notify(value *T) { // To be called periodically. It sends out any pending events if the time the last // notification was sent is long enough ago. func (throttler *Throttler[T]) Ping() { - if throttler.pendingValue != nil { + if throttler.pendingValue != *new(T) { + // TODO improve: use a better way to recognize that there is a pending value. throttler.Notify(throttler.pendingValue) } }