package throttling import ( "context" "time" ) // Same as Throttler above but multi-thread safe and // using a event loop to scheduole notifications. THis runs its own // go routine for scheduling type _clock interface { time() time.Time } type systemClock struct{} func (clock systemClock) time() time.Time { return time.Now() } // Used for testing: var clock _clock = systemClock{} // Throttling notifications to prometheus and web clients // TO be used in a single-threaded manner. type Throttler[T any] struct { minDelay time.Duration // ucntion to call to implement the notification. notifier func(t *T) lastReportedTime time.Time pendingValue *T } func NewThrottler[T any](notifier func(t *T), minDelay time.Duration) Throttler[T] { throttler := Throttler[T]{ minDelay: minDelay, notifier: notifier, lastReportedTime: time.Time{}, pendingValue: nil, } return throttler } // Notify there is a new value. Performs notification if it was long enough ago // for the last notification to be sent. If not, it is stored as a pending event to // be sent later. New events that come in before a notification is sent override the // pending event. func (throttler *Throttler[T]) Notify(value *T) { if clock.time().Sub(throttler.lastReportedTime) >= throttler.minDelay { throttler.notifier(value) throttler.lastReportedTime = clock.time() throttler.pendingValue = nil return } throttler.pendingValue = value } // To be called periodically. It sends out any pending events if the time the last // notification was sent is long enough ago. func (throttler *Throttler[T]) Ping() { if throttler.pendingValue != nil { throttler.Notify(throttler.pendingValue) } } type AsyncThrottler[T any] struct { throttler Throttler[T] ctx context.Context cancel context.CancelFunc events chan *T ticker *time.Ticker } func NewAsyncThrottler[T any](notifier func(t *T), minDelay time.Duration, pollInterval time.Duration) *AsyncThrottler[T] { ctx, cancel := context.WithCancel(context.Background()) throttler := AsyncThrottler[T]{ throttler: NewThrottler[T](notifier, minDelay), ctx: ctx, cancel: cancel, events: make(chan *T), ticker: time.NewTicker(pollInterval), } go func() { for { select { case <-ctx.Done(): return case <-throttler.ticker.C: throttler.throttler.Ping() case event := <-throttler.events: throttler.throttler.Notify(event) } } }() return &throttler } func (throttler *AsyncThrottler[T]) Notify(value *T) { throttler.events <- value } func (throttler *AsyncThrottler[T]) Stop() { throttler.cancel() throttler.ticker.Stop() }