some moving around of files in the throttling package to make the
structure more clear.
This commit is contained in:
		
							parent
							
								
									4444d6bbcd
								
							
						
					
					
						commit
						c1ade0408d
					
				
							
								
								
									
										49
									
								
								pkg/support/throttling/async_throttler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								pkg/support/throttling/async_throttler.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,49 @@ | |||||||
|  | package throttling | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type AsyncThrottler[T any] struct { | ||||||
|  | 	throttler Throttler[T] | ||||||
|  | 	ctx       context.Context | ||||||
|  | 	cancel    context.CancelFunc | ||||||
|  | 	events    chan *T | ||||||
|  | 	ticker    *time.Ticker | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewAsyncThrottler[T any](notifier func(t *T), | ||||||
|  | 	minDelay time.Duration, | ||||||
|  | 	pollInterval time.Duration) *AsyncThrottler[T] { | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	throttler := AsyncThrottler[T]{ | ||||||
|  | 		throttler: NewThrottler[T](notifier, minDelay), | ||||||
|  | 		ctx:       ctx, | ||||||
|  | 		cancel:    cancel, | ||||||
|  | 		events:    make(chan *T), | ||||||
|  | 		ticker:    time.NewTicker(pollInterval), | ||||||
|  | 	} | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-ctx.Done(): | ||||||
|  | 				return | ||||||
|  | 			case <-throttler.ticker.C: | ||||||
|  | 				throttler.throttler.Ping() | ||||||
|  | 			case event := <-throttler.events: | ||||||
|  | 				throttler.throttler.Notify(event) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 	return &throttler | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (throttler *AsyncThrottler[T]) Notify(value *T) { | ||||||
|  | 	throttler.events <- value | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (throttler *AsyncThrottler[T]) Stop() { | ||||||
|  | 	throttler.cancel() | ||||||
|  | 	throttler.ticker.Stop() | ||||||
|  | } | ||||||
							
								
								
									
										20
									
								
								pkg/support/throttling/clock.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								pkg/support/throttling/clock.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,20 @@ | |||||||
|  | package throttling | ||||||
|  | 
 | ||||||
|  | import "time" | ||||||
|  | 
 | ||||||
|  | // Same as Throttler above but multi-thread safe and
 | ||||||
|  | // using a event loop to scheduole notifications. THis runs its own
 | ||||||
|  | // go routine for scheduling
 | ||||||
|  | 
 | ||||||
|  | type _clock interface { | ||||||
|  | 	time() time.Time | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type systemClock struct{} | ||||||
|  | 
 | ||||||
|  | func (clock systemClock) time() time.Time { | ||||||
|  | 	return time.Now() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Default clock, can be overriden in test cases.
 | ||||||
|  | var clock _clock = systemClock{} | ||||||
| @ -16,6 +16,8 @@ func (t *testClock) time() time.Time { | |||||||
| 
 | 
 | ||||||
| // Set this value to obtain a new value for the current time.
 | // Set this value to obtain a new value for the current time.
 | ||||||
| // This allows testing various scenario's with timing.
 | // This allows testing various scenario's with timing.
 | ||||||
|  | //
 | ||||||
|  | // Simply:    currentTime.now = ....
 | ||||||
| var currentTime = &testClock{} | var currentTime = &testClock{} | ||||||
| 
 | 
 | ||||||
| func TestMain(m *testing.M) { | func TestMain(m *testing.M) { | ||||||
| @ -1,27 +1,9 @@ | |||||||
| package throttling | package throttling | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" |  | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Same as Throttler above but multi-thread safe and
 |  | ||||||
| // using a event loop to scheduole notifications. THis runs its own
 |  | ||||||
| // go routine for scheduling
 |  | ||||||
| 
 |  | ||||||
| type _clock interface { |  | ||||||
| 	time() time.Time |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type systemClock struct{} |  | ||||||
| 
 |  | ||||||
| func (clock systemClock) time() time.Time { |  | ||||||
| 	return time.Now() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Used for testing:
 |  | ||||||
| var clock _clock = systemClock{} |  | ||||||
| 
 |  | ||||||
| // Throttling notifications to prometheus and web clients
 | // Throttling notifications to prometheus and web clients
 | ||||||
| // TO be used in a single-threaded manner.
 | // TO be used in a single-threaded manner.
 | ||||||
| type Throttler[T any] struct { | type Throttler[T any] struct { | ||||||
| @ -63,46 +45,3 @@ func (throttler *Throttler[T]) Ping() { | |||||||
| 		throttler.Notify(throttler.pendingValue) | 		throttler.Notify(throttler.pendingValue) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 |  | ||||||
| type AsyncThrottler[T any] struct { |  | ||||||
| 	throttler Throttler[T] |  | ||||||
| 	ctx       context.Context |  | ||||||
| 	cancel    context.CancelFunc |  | ||||||
| 	events    chan *T |  | ||||||
| 	ticker    *time.Ticker |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func NewAsyncThrottler[T any](notifier func(t *T), |  | ||||||
| 	minDelay time.Duration, |  | ||||||
| 	pollInterval time.Duration) *AsyncThrottler[T] { |  | ||||||
| 	ctx, cancel := context.WithCancel(context.Background()) |  | ||||||
| 	throttler := AsyncThrottler[T]{ |  | ||||||
| 		throttler: NewThrottler[T](notifier, minDelay), |  | ||||||
| 		ctx:       ctx, |  | ||||||
| 		cancel:    cancel, |  | ||||||
| 		events:    make(chan *T), |  | ||||||
| 		ticker:    time.NewTicker(pollInterval), |  | ||||||
| 	} |  | ||||||
| 	go func() { |  | ||||||
| 		for { |  | ||||||
| 			select { |  | ||||||
| 			case <-ctx.Done(): |  | ||||||
| 				return |  | ||||||
| 			case <-throttler.ticker.C: |  | ||||||
| 				throttler.throttler.Ping() |  | ||||||
| 			case event := <-throttler.events: |  | ||||||
| 				throttler.throttler.Notify(event) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 	return &throttler |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (throttler *AsyncThrottler[T]) Notify(value *T) { |  | ||||||
| 	throttler.events <- value |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (throttler *AsyncThrottler[T]) Stop() { |  | ||||||
| 	throttler.cancel() |  | ||||||
| 	throttler.ticker.Stop() |  | ||||||
| } |  | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user