109 lines
2.6 KiB
Go
109 lines
2.6 KiB
Go
package throttling
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
)
|
|
|
|
// Same as Throttler above but multi-thread safe and
|
|
// using a event loop to scheduole notifications. THis runs its own
|
|
// go routine for scheduling
|
|
|
|
type _clock interface {
|
|
time() time.Time
|
|
}
|
|
|
|
type systemClock struct{}
|
|
|
|
func (clock systemClock) time() time.Time {
|
|
return time.Now()
|
|
}
|
|
|
|
// Used for testing:
|
|
var clock _clock = systemClock{}
|
|
|
|
// Throttling notifications to prometheus and web clients
|
|
// TO be used in a single-threaded manner.
|
|
type Throttler[T any] struct {
|
|
minDelay time.Duration
|
|
// ucntion to call to implement the notification.
|
|
notifier func(t *T)
|
|
lastReportedTime time.Time
|
|
pendingValue *T
|
|
}
|
|
|
|
func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] {
|
|
throttler := Throttler[T]{
|
|
minDelay: minDelay,
|
|
notifier: notifier,
|
|
lastReportedTime: time.Time{},
|
|
pendingValue: nil,
|
|
}
|
|
return throttler
|
|
}
|
|
|
|
// Notify there is a new value. Performs notification if it was long enough ago
|
|
// for the last notification to be sent. If not, it is stored as a pending event to
|
|
// be sent later. New events that come in before a notification is sent override the
|
|
// pending event.
|
|
func (throttler *Throttler[T]) Notify(value *T) {
|
|
if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay {
|
|
throttler.notifier(value)
|
|
throttler.lastReportedTime = clock.time()
|
|
throttler.pendingValue = nil
|
|
return
|
|
}
|
|
throttler.pendingValue = value
|
|
}
|
|
|
|
// To be called periodically. It sends out any pending events if the time the last
|
|
// notification was sent is long enough ago.
|
|
func (throttler *Throttler[T]) Ping() {
|
|
if throttler.pendingValue != nil {
|
|
throttler.Notify(throttler.pendingValue)
|
|
}
|
|
}
|
|
|
|
type AsyncThrottler[T any] struct {
|
|
throttler Throttler[T]
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
events chan *T
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
func NewAsyncThrottler[T any](notifier func(t *T),
|
|
minDelay time.Duration,
|
|
pollInterval time.Duration) *AsyncThrottler[T] {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
throttler := AsyncThrottler[T]{
|
|
throttler: NewThrottler[T](notifier, minDelay),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
events: make(chan *T),
|
|
ticker: time.NewTicker(pollInterval),
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-throttler.ticker.C:
|
|
throttler.throttler.Ping()
|
|
case event := <-throttler.events:
|
|
throttler.throttler.Notify(event)
|
|
}
|
|
}
|
|
}()
|
|
return &throttler
|
|
}
|
|
|
|
func (throttler *AsyncThrottler[T]) Notify(value *T) {
|
|
throttler.events <- value
|
|
}
|
|
|
|
func (throttler *AsyncThrottler[T]) Stop() {
|
|
throttler.cancel()
|
|
throttler.ticker.Stop()
|
|
}
|