reverted changes to throttler for ignoring duplicate events. This caused

more problems since state is immutable but some state of agents and
client is not immutable so it ignored events that were not really
duplicates.
This reverts commit f6b0211336.
This commit is contained in:
Erik Brakkee 2024-08-24 23:22:48 +02:00
parent 474ac5ec30
commit db094d2a13
5 changed files with 30 additions and 67 deletions

View File

@ -7,7 +7,7 @@ import (
)
type StateNotifier struct {
throttler *throttling.AsyncThrottler[*models.State]
throttler *throttling.AsyncThrottler[models.State]
webNotificationChannel chan *models.State
prometheusNotificationChannel chan *models.State
}

View File

@ -5,15 +5,15 @@ import (
"time"
)
type AsyncThrottler[T comparable] struct {
type AsyncThrottler[T any] struct {
throttler Throttler[T]
ctx context.Context
cancel context.CancelFunc
events chan T
events chan *T
ticker *time.Ticker
}
func NewAsyncThrottler[T comparable](notifier func(t T),
func NewAsyncThrottler[T any](notifier func(t *T),
minDelay time.Duration,
pollInterval time.Duration) *AsyncThrottler[T] {
ctx, cancel := context.WithCancel(context.Background())
@ -21,7 +21,7 @@ func NewAsyncThrottler[T comparable](notifier func(t T),
throttler: NewThrottler[T](notifier, minDelay),
ctx: ctx,
cancel: cancel,
events: make(chan T),
events: make(chan *T),
ticker: time.NewTicker(pollInterval),
}
go func() {
@ -39,7 +39,7 @@ func NewAsyncThrottler[T comparable](notifier func(t T),
return &throttler
}
func (throttler *AsyncThrottler[T]) Notify(value T) {
func (throttler *AsyncThrottler[T]) Notify(value *T) {
throttler.events <- value
}

View File

@ -9,15 +9,15 @@ func (suite *ThrottlerTestSuite) Test_AsyncDeliverOneValue() {
pollInterval := 10 * time.Millisecond
throttler := NewAsyncThrottler[int](func(v int) {
value = v
throttler := NewAsyncThrottler[int](func(v *int) {
value = *v
}, time.Second, pollInterval)
defer throttler.Stop()
t0 := time.Now()
v := 1
currentTime.now = t0
throttler.Notify(v)
throttler.Notify(&v)
time.Sleep(2 * pollInterval)
suite.Equal(v, value)
@ -33,8 +33,8 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon
pollInterval := 10 * time.Millisecond
throttler := NewAsyncThrottler[int](func(v int) {
value = v
throttler := NewAsyncThrottler[int](func(v *int) {
value = *v
}, time.Second, pollInterval)
defer throttler.Stop()
@ -44,12 +44,12 @@ func (suite *ThrottlerTestSuite) Test_AsyncTwoNotificationsInSHortSucessionSecon
v2 := 2
v3 := 3
currentTime.now = t0
throttler.Notify(v1)
throttler.Notify(&v1)
time.Sleep(2 * pollInterval)
suite.Equal(v1, value)
throttler.Notify(v2)
throttler.Notify(v3)
throttler.Notify(&v2)
throttler.Notify(&v3)
time.Sleep(2 * pollInterval)
suite.Equal(v1, value)

View File

@ -6,14 +6,14 @@ import (
func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitialized() {
value := 0
throttler := NewThrottler[int](func(v int) {
value = v
throttler := NewThrottler[int](func(v *int) {
value = *v
}, 1.0)
t0 := time.Now()
v := 1
currentTime.now = t0
throttler.Notify(v)
throttler.Notify(&v)
suite.Equal(v, value)
value = 0
// subsequent ping will not lead to a notification
@ -25,8 +25,8 @@ func (suite *ThrottlerTestSuite) Test_throttlerImmediateNotificationAfterInitial
func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay() {
value := 0
delayMs := 1000
throttler := NewThrottler[int](func(v int) {
value = v
throttler := NewThrottler[int](func(v *int) {
value = *v
}, time.Duration(delayMs)*time.Millisecond)
t0 := time.Now()
@ -35,10 +35,10 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI
v2 := 2
v3 := 3
currentTime.now = t0
throttler.Notify(v1)
throttler.Notify(&v1)
suite.Equal(v1, value)
throttler.Notify(v2)
throttler.Notify(v3)
throttler.Notify(&v2)
throttler.Notify(&v3)
suite.Equal(v1, value)
currentTime.now = t0.Add(time.Duration(delayMs-1) * time.Millisecond)
@ -55,29 +55,3 @@ func (suite *ThrottlerTestSuite) Test_TwoNotificationsInSHortSucessionSecondOneI
throttler.Ping()
suite.Equal(0, value)
}
func (suite *ThrottlerTestSuite) Test_DuplicateEventsIgnored() {
value := 0
mindelay := 1 * time.Second
throttler := NewThrottler[int](func(v int) {
value = v
}, mindelay)
t0 := time.Now()
v := 1
currentTime.now = t0
throttler.Notify(v)
suite.Equal(v, value)
// sending the same value again will not lead to a notification
value = 0
currentTime.now = currentTime.now.Add(2 * mindelay)
throttler.Notify(v)
suite.Equal(0, value)
// sending a different value later will lead to a new notification
currentTime.now = currentTime.now.Add(2 * mindelay)
v++
throttler.Notify(v)
suite.Equal(v, value)
}

View File

@ -6,26 +6,20 @@ import (
// Throttling notifications to prometheus and web clients
// TO be used in a single-threaded manner.
//
// NOTE: the null value of type T is considered as a not-set value.
//
// The best usage of Throttler is with a pointer type.
type Throttler[T comparable] struct {
type Throttler[T any] struct {
minDelay time.Duration
// ucntion to call to implement the notification.
notifier func(t T)
notifier func(t *T)
lastReportedTime time.Time
pendingValue T
lastValue T
pendingValue *T
}
func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Throttler[T] {
func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] {
throttler := Throttler[T]{
minDelay: minDelay,
notifier: notifier,
lastReportedTime: time.Time{},
pendingValue: *new(T),
lastValue: *new(T),
pendingValue: nil,
}
return throttler
}
@ -34,16 +28,11 @@ func NewThrottler[T comparable](notifier func(t T), minDelay time.Duration) Thro
// for the last notification to be sent. If not, it is stored as a pending event to
// be sent later. New events that come in before a notification is sent override the
// pending event.
func (throttler *Throttler[T]) Notify(value T) {
if throttler.lastValue == value {
// duplicate events are not sent.
return
}
func (throttler *Throttler[T]) Notify(value *T) {
if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay {
throttler.notifier(value)
throttler.lastReportedTime = clock.time()
throttler.lastValue = value
throttler.pendingValue = *new(T)
throttler.pendingValue = nil
return
}
throttler.pendingValue = value
@ -52,7 +41,7 @@ func (throttler *Throttler[T]) Notify(value T) {
// To be called periodically. It sends out any pending events if the time the last
// notification was sent is long enough ago.
func (throttler *Throttler[T]) Ping() {
if throttler.pendingValue != *new(T) {
if throttler.pendingValue != nil {
throttler.Notify(throttler.pendingValue)
}
}