more problems since state is immutable but some state of agents and
client is not immutable so it ignored events that were not really
duplicates.
This reverts commit f6b0211336.
50 lines
1008 B
Go
50 lines
1008 B
Go
package throttling
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
)
|
|
|
|
type AsyncThrottler[T any] struct {
|
|
throttler Throttler[T]
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
events chan *T
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
func NewAsyncThrottler[T any](notifier func(t *T),
|
|
minDelay time.Duration,
|
|
pollInterval time.Duration) *AsyncThrottler[T] {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
throttler := AsyncThrottler[T]{
|
|
throttler: NewThrottler[T](notifier, minDelay),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
events: make(chan *T),
|
|
ticker: time.NewTicker(pollInterval),
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-throttler.ticker.C:
|
|
throttler.throttler.Ping()
|
|
case event := <-throttler.events:
|
|
throttler.throttler.Notify(event)
|
|
}
|
|
}
|
|
}()
|
|
return &throttler
|
|
}
|
|
|
|
func (throttler *AsyncThrottler[T]) Notify(value *T) {
|
|
throttler.events <- value
|
|
}
|
|
|
|
func (throttler *AsyncThrottler[T]) Stop() {
|
|
throttler.cancel()
|
|
throttler.ticker.Stop()
|
|
}
|