From 60cda211efcd5ae5d09950e3864f00915786cf86 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 18 Aug 2024 20:56:22 +0200 Subject: [PATCH] some moving around of files in the throttling package to make the structure more clear. --- pkg/support/throttling/async_throttler.go | 49 +++++++++++++++ pkg/support/throttling/clock.go | 20 ++++++ .../{utils_test.go => cloek_test.go} | 2 + pkg/support/throttling/throttler.go | 61 ------------------- 4 files changed, 71 insertions(+), 61 deletions(-) create mode 100644 pkg/support/throttling/async_throttler.go create mode 100644 pkg/support/throttling/clock.go rename pkg/support/throttling/{utils_test.go => cloek_test.go} (91%) diff --git a/pkg/support/throttling/async_throttler.go b/pkg/support/throttling/async_throttler.go new file mode 100644 index 0000000..f9c6dc3 --- /dev/null +++ b/pkg/support/throttling/async_throttler.go @@ -0,0 +1,49 @@ +package throttling + +import ( + "context" + "time" +) + +type AsyncThrottler[T any] struct { + throttler Throttler[T] + ctx context.Context + cancel context.CancelFunc + events chan *T + ticker *time.Ticker +} + +func NewAsyncThrottler[T any](notifier func(t *T), + minDelay time.Duration, + pollInterval time.Duration) *AsyncThrottler[T] { + ctx, cancel := context.WithCancel(context.Background()) + throttler := AsyncThrottler[T]{ + throttler: NewThrottler[T](notifier, minDelay), + ctx: ctx, + cancel: cancel, + events: make(chan *T), + ticker: time.NewTicker(pollInterval), + } + go func() { + for { + select { + case <-ctx.Done(): + return + case <-throttler.ticker.C: + throttler.throttler.Ping() + case event := <-throttler.events: + throttler.throttler.Notify(event) + } + } + }() + return &throttler +} + +func (throttler *AsyncThrottler[T]) Notify(value *T) { + throttler.events <- value +} + +func (throttler *AsyncThrottler[T]) Stop() { + throttler.cancel() + throttler.ticker.Stop() +} diff --git a/pkg/support/throttling/clock.go b/pkg/support/throttling/clock.go new file mode 100644 index 0000000..4071903 --- /dev/null +++ b/pkg/support/throttling/clock.go @@ -0,0 +1,20 @@ +package throttling + +import "time" + +// Same as Throttler above but multi-thread safe and +// using a event loop to scheduole notifications. THis runs its own +// go routine for scheduling + +type _clock interface { + time() time.Time +} + +type systemClock struct{} + +func (clock systemClock) time() time.Time { + return time.Now() +} + +// Default clock, can be overriden in test cases. +var clock _clock = systemClock{} diff --git a/pkg/support/throttling/utils_test.go b/pkg/support/throttling/cloek_test.go similarity index 91% rename from pkg/support/throttling/utils_test.go rename to pkg/support/throttling/cloek_test.go index 54cb23d..1acbd49 100644 --- a/pkg/support/throttling/utils_test.go +++ b/pkg/support/throttling/cloek_test.go @@ -16,6 +16,8 @@ func (t *testClock) time() time.Time { // Set this value to obtain a new value for the current time. // This allows testing various scenario's with timing. +// +// Simply: currentTime.now = .... var currentTime = &testClock{} func TestMain(m *testing.M) { diff --git a/pkg/support/throttling/throttler.go b/pkg/support/throttling/throttler.go index e2fd4ec..a890dc3 100644 --- a/pkg/support/throttling/throttler.go +++ b/pkg/support/throttling/throttler.go @@ -1,27 +1,9 @@ package throttling import ( - "context" "time" ) -// Same as Throttler above but multi-thread safe and -// using a event loop to scheduole notifications. THis runs its own -// go routine for scheduling - -type _clock interface { - time() time.Time -} - -type systemClock struct{} - -func (clock systemClock) time() time.Time { - return time.Now() -} - -// Used for testing: -var clock _clock = systemClock{} - // Throttling notifications to prometheus and web clients // TO be used in a single-threaded manner. type Throttler[T any] struct { @@ -63,46 +45,3 @@ func (throttler *Throttler[T]) Ping() { throttler.Notify(throttler.pendingValue) } } - -type AsyncThrottler[T any] struct { - throttler Throttler[T] - ctx context.Context - cancel context.CancelFunc - events chan *T - ticker *time.Ticker -} - -func NewAsyncThrottler[T any](notifier func(t *T), - minDelay time.Duration, - pollInterval time.Duration) *AsyncThrottler[T] { - ctx, cancel := context.WithCancel(context.Background()) - throttler := AsyncThrottler[T]{ - throttler: NewThrottler[T](notifier, minDelay), - ctx: ctx, - cancel: cancel, - events: make(chan *T), - ticker: time.NewTicker(pollInterval), - } - go func() { - for { - select { - case <-ctx.Done(): - return - case <-throttler.ticker.C: - throttler.throttler.Ping() - case event := <-throttler.events: - throttler.throttler.Notify(event) - } - } - }() - return &throttler -} - -func (throttler *AsyncThrottler[T]) Notify(value *T) { - throttler.events <- value -} - -func (throttler *AsyncThrottler[T]) Stop() { - throttler.cancel() - throttler.ticker.Stop() -}