From 52160a368cea1ea2b0ec263274f23201d441cd47 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Fri, 16 Aug 2024 21:08:44 +0200 Subject: [PATCH] more generalization of how time is handled in the tests. Asynchronous variant that is easier to use and multi-thread safe. --- cmd/converge/converge.go | 8 +- cmd/converge/notifier.go | 24 ++---- .../throttling/async_throttler_test.go | 70 ++++++++++++++++ pkg/support/throttling/throtller_test.go | 29 ++++--- pkg/support/throttling/throttler.go | 81 ++++++++++++++++--- pkg/support/throttling/utils_test.go | 27 +++++++ 6 files changed, 197 insertions(+), 42 deletions(-) create mode 100644 pkg/support/throttling/async_throttler_test.go create mode 100644 pkg/support/throttling/utils_test.go diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go index b42241a..93d7c89 100644 --- a/cmd/converge/converge.go +++ b/cmd/converge/converge.go @@ -12,8 +12,8 @@ import ( _ "net/http/pprof" "os" "regexp" - "strconv" "strings" + "time" _ "time/tzdata" ) @@ -57,7 +57,7 @@ func printHelp(msg string) { " trailing slash. \n" + "--pprof: Enable the pprof endpoint at /debug/pprof\n" + "--throttling-interval: Minimum delay between notificaitons ot users and more updating \n" + - " prometheus statistics (default 2s). " + " prometheus statistics (default 1s). " fmt.Fprintln(os.Stderr, helpText) os.Exit(1) } @@ -75,7 +75,7 @@ func main() { staticdir := "../static" contextpath := "/" pprof := false - throttlingInterval := 2.0 + throttlingInterval := 1 * time.Second args := os.Args[1:] for len(args) > 0 && strings.HasPrefix(args[0], "-") { @@ -105,7 +105,7 @@ func main() { printHelp("The --throttling-interval option expects an argument") } var err error - throttlingInterval, err = strconv.ParseFloat(args[1], 64) + throttlingInterval, err = time.ParseDuration(args[1]) if err != nil { printHelp(fmt.Sprintf("Invalid value for throttling interval '%s;", args[1])) } diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 869db50..168b494 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -7,37 +7,23 @@ import ( ) type StateNotifier struct { - throttler throttling.Throttler[models.State] - eventChannel chan *models.State + throttler *throttling.AsyncThrottler[models.State] webNotificationChannel chan *models.State prometheusNotificationChannel chan *models.State } -func NewStateNotifier(minDelaySeconds float64) *StateNotifier { +func NewStateNotifier(minDelay time.Duration) *StateNotifier { notifier := StateNotifier{ - eventChannel: make(chan *models.State), webNotificationChannel: make(chan *models.State), prometheusNotificationChannel: make(chan *models.State), } - notifier.throttler = throttling.NewThrottler(func(state *models.State) { + notifier.throttler = throttling.NewAsyncThrottler(func(state *models.State) { notifier.webNotificationChannel <- state notifier.prometheusNotificationChannel <- state - }, minDelaySeconds) - - ticker := time.NewTicker(1 * time.Second) - go func() { - for { - select { - case <-ticker.C: - notifier.throttler.Ping(time.Now()) - case state := <-notifier.eventChannel: - notifier.throttler.Notify(time.Now(), state) - } - } - }() + }, minDelay, 1*time.Second) return ¬ifier } func (notifier StateNotifier) Publish(state *models.State) { - notifier.eventChannel <- state + notifier.throttler.Notify(state) } diff --git a/pkg/support/throttling/async_throttler_test.go b/pkg/support/throttling/async_throttler_test.go new file mode 100644 index 0000000..1977803 --- /dev/null +++ b/pkg/support/throttling/async_throttler_test.go @@ -0,0 +1,70 @@ +package throttling + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func Test_AsyncDeliverOneValue(t *testing.T) { + value := 0 + pollInterval := 10 * time.Millisecond + + throttler := NewAsyncThrottler[int](func(v *int) { + value = *v + }, time.Second, pollInterval) + defer throttler.Stop() + + t0 := time.Now() + v := 1 + currentTime.now = t0 + throttler.Notify(&v) + time.Sleep(2 * pollInterval) + assert.Equal(t, v, value) + + // subsequent ping will not lead to a notification + value = 0 + time.Sleep(2 * pollInterval) + assert.Equal(t, 0, value) +} + +func Test_AsyncTwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay(t *testing.T) { + value := 0 + delayMs := 1000 + + pollInterval := 10 * time.Millisecond + + throttler := NewAsyncThrottler[int](func(v *int) { + value = *v + }, time.Second, pollInterval) + defer throttler.Stop() + + t0 := time.Now() + v1 := 1 + // v2 will not be delivered, the last value in the time interval will be + v2 := 2 + v3 := 3 + currentTime.now = t0 + throttler.Notify(&v1) + time.Sleep(2 * pollInterval) + assert.Equal(t, v1, value) + + throttler.Notify(&v2) + throttler.Notify(&v3) + time.Sleep(2 * pollInterval) + assert.Equal(t, v1, value) + + currentTime.now = t0.Add(time.Duration(delayMs-1) * time.Millisecond) + time.Sleep(2 * pollInterval) + assert.Equal(t, v1, value) + + currentTime.now = t0.Add(time.Duration(delayMs) * time.Millisecond) + time.Sleep(2 * pollInterval) + assert.Equal(t, v3, value) + + // another ping won't deliver the same value again. + value = 0 + currentTime.now = t0.Add(time.Duration(delayMs) * time.Millisecond) + time.Sleep(2 * pollInterval) + assert.Equal(t, 0, value) +} diff --git a/pkg/support/throttling/throtller_test.go b/pkg/support/throttling/throtller_test.go index 9c5fba7..9672809 100644 --- a/pkg/support/throttling/throtller_test.go +++ b/pkg/support/throttling/throtller_test.go @@ -14,11 +14,13 @@ func Test_throttlerImmediateNotificationAfterInitialized(t *testing.T) { t0 := time.Now() v := 1 - throttler.Notify(t0, &v) + currentTime.now = t0 + throttler.Notify(&v) assert.Equal(t, v, value) value = 0 // subsequent ping will not lead to a notification - throttler.Ping(t0.Add(10 * time.Second)) + currentTime.now = t0.Add(10 * time.Second) + throttler.Ping() assert.Equal(t, 0, value) } @@ -27,24 +29,31 @@ func Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay(t *testin delayMs := 1000 throttler := NewThrottler[int](func(v *int) { value = *v - }, float64(delayMs)/1000.0) + }, time.Duration(delayMs)*time.Millisecond) t0 := time.Now() v1 := 1 // v2 will not be delivered, the last value in the time interval will be v2 := 2 v3 := 3 - throttler.Notify(t0, &v1) + currentTime.now = t0 + throttler.Notify(&v1) assert.Equal(t, v1, value) - throttler.Notify(t0, &v2) - throttler.Notify(t0, &v3) + throttler.Notify(&v2) + throttler.Notify(&v3) assert.Equal(t, v1, value) - throttler.Ping(t0.Add(time.Duration(delayMs-1) * time.Millisecond)) + + currentTime.now = t0.Add(time.Duration(delayMs-1) * time.Millisecond) + throttler.Ping() assert.Equal(t, v1, value) - throttler.Ping(t0.Add(time.Duration(delayMs) * time.Millisecond)) + + currentTime.now = t0.Add(time.Duration(delayMs) * time.Millisecond) + throttler.Ping() assert.Equal(t, v3, value) + + // another ping won't deliver the same value again. value = 0 - // another ping won' deliver the same value again. - throttler.Ping(t0.Add(time.Duration(delayMs*2) * time.Millisecond)) + currentTime.now = t0.Add(time.Duration(delayMs) * time.Millisecond) + throttler.Ping() assert.Equal(t, 0, value) } diff --git a/pkg/support/throttling/throttler.go b/pkg/support/throttling/throttler.go index 345f582..e2fd4ec 100644 --- a/pkg/support/throttling/throttler.go +++ b/pkg/support/throttling/throttler.go @@ -1,20 +1,40 @@ package throttling -import "time" +import ( + "context" + "time" +) + +// Same as Throttler above but multi-thread safe and +// using a event loop to scheduole notifications. THis runs its own +// go routine for scheduling + +type _clock interface { + time() time.Time +} + +type systemClock struct{} + +func (clock systemClock) time() time.Time { + return time.Now() +} + +// Used for testing: +var clock _clock = systemClock{} // Throttling notifications to prometheus and web clients // TO be used in a single-threaded manner. type Throttler[T any] struct { - minDelaySeconds float64 + minDelay time.Duration // ucntion to call to implement the notification. notifier func(t *T) lastReportedTime time.Time pendingValue *T } -func NewThrottler[T any](notifier func(t *T), minDelaySeconds float64) Throttler[T] { +func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] { throttler := Throttler[T]{ - minDelaySeconds: minDelaySeconds, + minDelay: minDelay, notifier: notifier, lastReportedTime: time.Time{}, pendingValue: nil, @@ -26,10 +46,10 @@ func NewThrottler[T any](notifier func(t *T), minDelaySeconds float64) Throttler // for the last notification to be sent. If not, it is stored as a pending event to // be sent later. New events that come in before a notification is sent override the // pending event. -func (throttler *Throttler[T]) Notify(time time.Time, value *T) { - if time.Sub(throttler.lastReportedTime).Seconds() >= throttler.minDelaySeconds { +func (throttler *Throttler[T]) Notify(value *T) { + if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay { throttler.notifier(value) - throttler.lastReportedTime = time + throttler.lastReportedTime = clock.time() throttler.pendingValue = nil return } @@ -38,8 +58,51 @@ func (throttler *Throttler[T]) Notify(time time.Time, value *T) { // To be called periodically. It sends out any pending events if the time the last // notification was sent is long enough ago. -func (throttler *Throttler[T]) Ping(time time.Time) { +func (throttler *Throttler[T]) Ping() { if throttler.pendingValue != nil { - throttler.Notify(time, throttler.pendingValue) + throttler.Notify(throttler.pendingValue) } } + +type AsyncThrottler[T any] struct { + throttler Throttler[T] + ctx context.Context + cancel context.CancelFunc + events chan *T + ticker *time.Ticker +} + +func NewAsyncThrottler[T any](notifier func(t *T), + minDelay time.Duration, + pollInterval time.Duration) *AsyncThrottler[T] { + ctx, cancel := context.WithCancel(context.Background()) + throttler := AsyncThrottler[T]{ + throttler: NewThrottler[T](notifier, minDelay), + ctx: ctx, + cancel: cancel, + events: make(chan *T), + ticker: time.NewTicker(pollInterval), + } + go func() { + for { + select { + case <-ctx.Done(): + return + case <-throttler.ticker.C: + throttler.throttler.Ping() + case event := <-throttler.events: + throttler.throttler.Notify(event) + } + } + }() + return &throttler +} + +func (throttler *AsyncThrottler[T]) Notify(value *T) { + throttler.events <- value +} + +func (throttler *AsyncThrottler[T]) Stop() { + throttler.cancel() + throttler.ticker.Stop() +} diff --git a/pkg/support/throttling/utils_test.go b/pkg/support/throttling/utils_test.go new file mode 100644 index 0000000..54cb23d --- /dev/null +++ b/pkg/support/throttling/utils_test.go @@ -0,0 +1,27 @@ +package throttling + +import ( + "os" + "testing" + "time" +) + +type testClock struct { + now time.Time +} + +func (t *testClock) time() time.Time { + return t.now +} + +// Set this value to obtain a new value for the current time. +// This allows testing various scenario's with timing. +var currentTime = &testClock{} + +func TestMain(m *testing.M) { + oldclock := clock + clock = currentTime + exitCode := m.Run() + clock = oldclock + os.Exit(exitCode) +}