diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 60cab7f..168b494 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -7,7 +7,7 @@ import ( ) type StateNotifier struct { - throttler *throttling.AsyncThrottler[*models.State] + throttler *throttling.AsyncThrottler[models.State] webNotificationChannel chan *models.State prometheusNotificationChannel chan *models.State } diff --git a/pkg/support/throttling/async_throttler.go b/pkg/support/throttling/async_throttler.go index 48d4b94..f9c6dc3 100644 --- a/pkg/support/throttling/async_throttler.go +++ b/pkg/support/throttling/async_throttler.go @@ -5,15 +5,15 @@ import ( "time" ) -type AsyncThrottler[T comparable] struct { +type AsyncThrottler[T any] struct { throttler Throttler[T] ctx context.Context cancel context.CancelFunc - events chan T + events chan *T ticker *time.Ticker } -func NewAsyncThrottler[T comparable](notifier func(t T), +func NewAsyncThrottler[T any](notifier func(t *T), minDelay time.Duration, pollInterval time.Duration) *AsyncThrottler[T] { ctx, cancel := context.WithCancel(context.Background()) @@ -21,7 +21,7 @@ func NewAsyncThrottler[T comparable](notifier func(t T), throttler: NewThrottler[T](notifier, minDelay), ctx: ctx, cancel: cancel, - events: make(chan T), + events: make(chan *T), ticker: time.NewTicker(pollInterval), } go func() { @@ -39,7 +39,7 @@ func NewAsyncThrottler[T comparable](notifier func(t T), return &throttler } -func (throttler *AsyncThrottler[T]) Notify(value T) { +func (throttler *AsyncThrottler[T]) Notify(value *T) { throttler.events <- value } diff --git a/pkg/support/throttling/async_throttler_test.go b/pkg/support/throttling/async_throttler_test.go index cc59f22..ab60034 100644 --- a/pkg/support/throttling/async_throttler_test.go +++ b/pkg/support/throttling/async_throttler_test.go @@ -9,15 +9,15 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() { pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v int) { - value = v + throttler := NewAsyncThrottler[int](func(v *int) { + value = *v }, time.Second, pollInterval) defer throttler.Stop() t0 := time.Now() v := 1 currentTime.now = t0 - throttler.Notify(v) + throttler.Notify(&v) time.Sleep(2 * pollInterval) suite.Equal(v, value) @@ -33,8 +33,8 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon pollInterval := 10 * time.Millisecond - throttler := NewAsyncThrottler[int](func(v int) { - value = v + throttler := NewAsyncThrottler[int](func(v *int) { + value = *v }, time.Second, pollInterval) defer throttler.Stop() @@ -44,12 +44,12 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon v2 := 2 v3 := 3 currentTime.now = t0 - throttler.Notify(v1) + throttler.Notify(&v1) time.Sleep(2 * pollInterval) suite.Equal(v1, value) - throttler.Notify(v2) - throttler.Notify(v3) + throttler.Notify(&v2) + throttler.Notify(&v3) time.Sleep(2 * pollInterval) suite.Equal(v1, value) diff --git a/pkg/support/throttling/throtller_test.go b/pkg/support/throttling/throtller_test.go index cacf244..c7442a2 100644 --- a/pkg/support/throttling/throtller_test.go +++ b/pkg/support/throttling/throtller_test.go @@ -6,14 +6,14 @@ import ( func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitialized() { value := 0 - throttler := NewThrottler[int](func(v int) { - value = v + throttler := NewThrottler[int](func(v *int) { + value = *v }, 1.0) t0 := time.Now() v := 1 currentTime.now = t0 - throttler.Notify(v) + throttler.Notify(&v) suite.Equal(v, value) value = 0 // subsequent ping will not lead to a notification @@ -25,8 +25,8 @@ func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitial func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay() { value := 0 delayMs := 1000 - throttler := NewThrottler[int](func(v int) { - value = v + throttler := NewThrottler[int](func(v *int) { + value = *v }, time.Duration(delayMs)*time.Millisecond) t0 := time.Now() @@ -35,10 +35,10 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI v2 := 2 v3 := 3 currentTime.now = t0 - throttler.Notify(v1) + throttler.Notify(&v1) suite.Equal(v1, value) - throttler.Notify(v2) - throttler.Notify(v3) + throttler.Notify(&v2) + throttler.Notify(&v3) suite.Equal(v1, value) currentTime.now = t0.Add(time.Duration(delayMs-1) * time.Millisecond) @@ -55,29 +55,3 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI throttler.Ping() suite.Equal(0, value) } - -func (suite *ThrottlerTestSuite) Test_DuplicateEventsIgnored() { - value := 0 - mindelay := 1 * time.Second - throttler := NewThrottler[int](func(v int) { - value = v - }, mindelay) - - t0 := time.Now() - v := 1 - currentTime.now = t0 - throttler.Notify(v) - suite.Equal(v, value) - - // sending the same value again will not lead to a notification - value = 0 - currentTime.now = currentTime.now.Add(2 * mindelay) - throttler.Notify(v) - suite.Equal(0, value) - - // sending a different value later will lead to a new notification - currentTime.now = currentTime.now.Add(2 * mindelay) - v++ - throttler.Notify(v) - suite.Equal(v, value) -} diff --git a/pkg/support/throttling/throttler.go b/pkg/support/throttling/throttler.go index 04b6b37..a890dc3 100644 --- a/pkg/support/throttling/throttler.go +++ b/pkg/support/throttling/throttler.go @@ -6,26 +6,20 @@ import ( // Throttling notifications to prometheus and web clients // TO be used in a single-threaded manner. -// -// NOTE: the null value of type T is considered as a not-set value. -// -// The best usage of Throttler is with a pointer type. -type Throttler[T comparable] struct { +type Throttler[T any] struct { minDelay time.Duration // ucntion to call to implement the notification. - notifier func(t T) + notifier func(t *T) lastReportedTime time.Time - pendingValue T - lastValue T + pendingValue *T } -func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Throttler[T] { +func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] { throttler := Throttler[T]{ minDelay: minDelay, notifier: notifier, lastReportedTime: time.Time{}, - pendingValue: *new(T), - lastValue: *new(T), + pendingValue: nil, } return throttler } @@ -34,16 +28,11 @@ func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Thro // for the last notification to be sent. If not, it is stored as a pending event to // be sent later. New events that come in before a notification is sent override the // pending event. -func (throttler *Throttler[T]) Notify(value T) { - if throttler.lastValue == value { - // duplicate events are not sent. - return - } +func (throttler *Throttler[T]) Notify(value *T) { if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay { throttler.notifier(value) throttler.lastReportedTime = clock.time() - throttler.lastValue = value - throttler.pendingValue = *new(T) + throttler.pendingValue = nil return } throttler.pendingValue = value @@ -52,7 +41,7 @@ func (throttler *Throttler[T]) Notify(value T) { // To be called periodically. It sends out any pending events if the time the last // notification was sent is long enough ago. func (throttler *Throttler[T]) Ping() { - if throttler.pendingValue != *new(T) { + if throttler.pendingValue != nil { throttler.Notify(throttler.pendingValue) } }