50 lines
		
	
	
		
			1008 B
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			50 lines
		
	
	
		
			1008 B
		
	
	
	
		
			Go
		
	
	
	
	
	
package throttling
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
type AsyncThrottler[T any] struct {
 | 
						|
	throttler Throttler[T]
 | 
						|
	ctx       context.Context
 | 
						|
	cancel    context.CancelFunc
 | 
						|
	events    chan *T
 | 
						|
	ticker    *time.Ticker
 | 
						|
}
 | 
						|
 | 
						|
func NewAsyncThrottler[T any](notifier func(t *T),
 | 
						|
	minDelay time.Duration,
 | 
						|
	pollInterval time.Duration) *AsyncThrottler[T] {
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
	throttler := AsyncThrottler[T]{
 | 
						|
		throttler: NewThrottler[T](notifier, minDelay),
 | 
						|
		ctx:       ctx,
 | 
						|
		cancel:    cancel,
 | 
						|
		events:    make(chan *T),
 | 
						|
		ticker:    time.NewTicker(pollInterval),
 | 
						|
	}
 | 
						|
	go func() {
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				return
 | 
						|
			case <-throttler.ticker.C:
 | 
						|
				throttler.throttler.Ping()
 | 
						|
			case event := <-throttler.events:
 | 
						|
				throttler.throttler.Notify(event)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return &throttler
 | 
						|
}
 | 
						|
 | 
						|
func (throttler *AsyncThrottler[T]) Notify(value *T) {
 | 
						|
	throttler.events <- value
 | 
						|
}
 | 
						|
 | 
						|
func (throttler *AsyncThrottler[T]) Stop() {
 | 
						|
	throttler.cancel()
 | 
						|
	throttler.ticker.Stop()
 | 
						|
}
 |