From db094d2a13dcc42b20901a1463a6c25f238e04b0 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sat, 24 Aug 2024 23:22:48 +0200 Subject: [PATCH] reverted changes to throttler for ignoring duplicate events. This caused more problems since state is immutable but some state of agents and client is not immutable so it ignored events that were not really duplicates. This reverts commit f6b02113366231be311bde46164e00a8cd26d0ff. --- cmd/converge/notifier.go | 2 +- pkg/support/throttling/async_throttler.go | 10 ++--- .../throttling/async_throttler_test.go | 16 +++---- pkg/support/throttling/throtller_test.go | 42 ++++--------------- pkg/support/throttling/throttler.go | 27 ++++-------- 5 files changed, 30 insertions(+), 67 deletions(-) diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 60cab7f..168b494 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -7,7 +7,7 @@ import ( ) type StateNotifier struct { - throttler *throttling.AsyncThrottler[*models.State] + throttler *throttling.AsyncThrottler[models.State] webNotificationChannel chan *models.State prometheusNotificationChannel chan *models.State } diff --git a/pkg/support/throttling/async_throttler.go b/pkg/support/throttling/async_throttler.go index 48d4b94..f9c6dc3 100644 --- a/pkg/support/throttling/async_throttler.go +++ b/pkg/support/throttling/async_throttler.go @@ -5,15 +5,15 @@ import ( "time" ) -type AsyncThrottler[T comparable] struct { +type AsyncThrottler[T any] struct { throttler Throttler[T] ctx context.Context cancel context.CancelFunc - events chan T + events chan *T ticker *time.Ticker } -func NewAsyncThrottler[T comparable](notifier func(t T), +func NewAsyncThrottler[T any](notifier func(t *T), minDelay time.Duration, pollInterval time.Duration) *AsyncThrottler[T] { ctx, cancel := context.WithCancel(context.Background()) @@ -21,7 +21,7 @@ func NewAsyncThrottler[T comparable](notifier func(t T), throttler: NewThrottler[T](notifier, minDelay), ctx: ctx, cancel: cancel, - events: make(chan T), + events: make(chan *T), ticker: time.NewTicker(pollInterval), } go func() { @@ -39,7 +39,7 @@ func NewAsyncThrottler[T comparable](notifier func(t T), return &throttler } -func (throttler *AsyncThrottler[T]) Notify(value T) { +func (throttler *AsyncThrottler[T]) Notify(value *T) { throttler.events <- value } diff --git a/pkg/support/throttling/async_throttler_test.go b/pkg/support/throttling/async_throttler_test.go index cc59f22..ab60034 100644 --- a/pkg/support/throttling/async_throttler_test.go +++ b/pkg/support/throttling/async_throttler_test.go @@ -9,15 +9,15 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() { pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v int) { - value = v + throttler := NewAsyncThrottler[int](func(v *int) { + value = *v }, time.Second, pollInterval) defer throttler.Stop() t0 := time.Now() v := 1 currentTime.now = t0 - throttler.Notify(v) + throttler.Notify(&v) time.Sleep(2 * pollInterval) suite.Equal(v, value) @@ -33,8 +33,8 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v int) { - value = v + throttler := NewAsyncThrottler[int](func(v *int) { + value = *v }, time.Second, pollInterval) defer throttler.Stop() @@ -44,12 +44,12 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon v2 := 2 v3 := 3 currentTime.now = t0 - throttler.Notify(v1) + throttler.Notify(&v1) time.Sleep(2 * pollInterval) suite.Equal(v1, value) - throttler.Notify(v2) - throttler.Notify(v3) + throttler.Notify(&v2) + throttler.Notify(&v3) time.Sleep(2 * pollInterval) suite.Equal(v1, value) diff --git a/pkg/support/throttling/throtller_test.go b/pkg/support/throttling/throtller_test.go index cacf244..c7442a2 100644 --- a/pkg/support/throttling/throtller_test.go +++ b/pkg/support/throttling/throtller_test.go @@ -6,14 +6,14 @@ import ( func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitialized() { value := 0 - throttler := NewThrottler[int](func(v int) { - value = v + throttler := NewThrottler[int](func(v *int) { + value = *v }, 1.0) t0 := time.Now() v := 1 currentTime.now = t0 - throttler.Notify(v) + throttler.Notify(&v) suite.Equal(v, value) value = 0 // subsequent ping will not lead to a notification @@ -25,8 +25,8 @@ func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitial func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay() { value := 0 delayMs := 1000 - throttler := NewThrottler[int](func(v int) { - value = v + throttler := NewThrottler[int](func(v *int) { + value = *v }, time.Duration(delayMs)*time.Millisecond) t0 := time.Now() @@ -35,10 +35,10 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI v2 := 2 v3 := 3 currentTime.now = t0 - throttler.Notify(v1) + throttler.Notify(&v1) suite.Equal(v1, value) - throttler.Notify(v2) - throttler.Notify(v3) + throttler.Notify(&v2) + throttler.Notify(&v3) suite.Equal(v1, value) currentTime.now = t0.Add(time.Duration(delayMs-1) * time.Millisecond) @@ -55,29 +55,3 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI throttler.Ping() suite.Equal(0, value) } - -func (suite *ThrottlerTestSuite) Test_DuplicateEventsIgnored() { - value := 0 - mindelay := 1 * time.Second - throttler := NewThrottler[int](func(v int) { - value = v - }, mindelay) - - t0 := time.Now() - v := 1 - currentTime.now = t0 - throttler.Notify(v) - suite.Equal(v, value) - - // sending the same value again will not lead to a notification - value = 0 - currentTime.now = currentTime.now.Add(2 * mindelay) - throttler.Notify(v) - suite.Equal(0, value) - - // sending a different value later will lead to a new notification - currentTime.now = currentTime.now.Add(2 * mindelay) - v++ - throttler.Notify(v) - suite.Equal(v, value) -} diff --git a/pkg/support/throttling/throttler.go b/pkg/support/throttling/throttler.go index 04b6b37..a890dc3 100644 --- a/pkg/support/throttling/throttler.go +++ b/pkg/support/throttling/throttler.go @@ -6,26 +6,20 @@ import ( // Throttling notifications to prometheus and web clients // TO be used in a single-threaded manner. -// -// NOTE: the null value of type T is considered as a not-set value. -// -// The best usage of Throttler is with a pointer type. -type Throttler[T comparable] struct { +type Throttler[T any] struct { minDelay time.Duration // ucntion to call to implement the notification. - notifier func(t T) + notifier func(t *T) lastReportedTime time.Time - pendingValue T - lastValue T + pendingValue *T } -func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Throttler[T] { +func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] { throttler := Throttler[T]{ minDelay: minDelay, notifier: notifier, lastReportedTime: time.Time{}, - pendingValue: *new(T), - lastValue: *new(T), + pendingValue: nil, } return throttler } @@ -34,16 +28,11 @@ func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Thro // for the last notification to be sent. If not, it is stored as a pending event to // be sent later. New events that come in before a notification is sent override the // pending event. -func (throttler *Throttler[T]) Notify(value T) { - if throttler.lastValue == value { - // duplicate events are not sent. - return - } +func (throttler *Throttler[T]) Notify(value *T) { if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay { throttler.notifier(value) throttler.lastReportedTime = clock.time() - throttler.lastValue = value - throttler.pendingValue = *new(T) + throttler.pendingValue = nil return } throttler.pendingValue = value @@ -52,7 +41,7 @@ func (throttler *Throttler[T]) Notify(value T) { // To be called periodically. It sends out any pending events if the time the last // notification was sent is long enough ago. func (throttler *Throttler[T]) Ping() { - if throttler.pendingValue != *new(T) { + if throttler.pendingValue != nil { throttler.Notify(throttler.pendingValue) } }