moved the throttler to its own package.
This commit is contained in:
parent
3cd2ccf25c
commit
1bf559bb65
@ -2,11 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"git.wamblee.org/converge/pkg/models"
|
"git.wamblee.org/converge/pkg/models"
|
||||||
|
"git.wamblee.org/converge/pkg/support/throttling"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StateNotifier struct {
|
type StateNotifier struct {
|
||||||
throttler Throttler[models.State]
|
throttler throttling.Throttler[models.State]
|
||||||
eventChannel chan *models.State
|
eventChannel chan *models.State
|
||||||
webNotificationChannel chan *models.State
|
webNotificationChannel chan *models.State
|
||||||
prometheusNotificationChannel chan *models.State
|
prometheusNotificationChannel chan *models.State
|
||||||
@ -18,7 +19,7 @@ func NewStateNotifier(minDelaySeconds float64) *StateNotifier {
|
|||||||
webNotificationChannel: make(chan *models.State),
|
webNotificationChannel: make(chan *models.State),
|
||||||
prometheusNotificationChannel: make(chan *models.State),
|
prometheusNotificationChannel: make(chan *models.State),
|
||||||
}
|
}
|
||||||
notifier.throttler = NewThrottler(func(state *models.State) {
|
notifier.throttler = throttling.NewThrottler(func(state *models.State) {
|
||||||
notifier.webNotificationChannel <- state
|
notifier.webNotificationChannel <- state
|
||||||
notifier.prometheusNotificationChannel <- state
|
notifier.prometheusNotificationChannel <- state
|
||||||
}, minDelaySeconds)
|
}, minDelaySeconds)
|
||||||
@ -28,9 +29,9 @@ func NewStateNotifier(minDelaySeconds float64) *StateNotifier {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
notifier.throttler.ping(time.Now())
|
notifier.throttler.Ping(time.Now())
|
||||||
case state := <-notifier.eventChannel:
|
case state := <-notifier.eventChannel:
|
||||||
notifier.throttler.notify(time.Now(), state)
|
notifier.throttler.Notify(time.Now(), state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package main
|
package throttling
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -14,11 +14,11 @@ func Test_throttlerImmediateNotificationAfterInitialized(t *testing.T) {
|
|||||||
|
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
v := 1
|
v := 1
|
||||||
throttler.notify(t0, &v)
|
throttler.Notify(t0, &v)
|
||||||
assert.Equal(t, v, value)
|
assert.Equal(t, v, value)
|
||||||
value = 0
|
value = 0
|
||||||
// subsequent ping will not lead to a notification
|
// subsequent ping will not lead to a notification
|
||||||
throttler.ping(t0.Add(10 * time.Second))
|
throttler.Ping(t0.Add(10 * time.Second))
|
||||||
assert.Equal(t, 0, value)
|
assert.Equal(t, 0, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,17 +34,17 @@ func Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay(t *testin
|
|||||||
// v2 will not be delivered, the last value in the time interval will be
|
// v2 will not be delivered, the last value in the time interval will be
|
||||||
v2 := 2
|
v2 := 2
|
||||||
v3 := 3
|
v3 := 3
|
||||||
throttler.notify(t0, &v1)
|
throttler.Notify(t0, &v1)
|
||||||
assert.Equal(t, v1, value)
|
assert.Equal(t, v1, value)
|
||||||
throttler.notify(t0, &v2)
|
throttler.Notify(t0, &v2)
|
||||||
throttler.notify(t0, &v3)
|
throttler.Notify(t0, &v3)
|
||||||
assert.Equal(t, v1, value)
|
assert.Equal(t, v1, value)
|
||||||
throttler.ping(t0.Add(time.Duration(delayMs-1) * time.Millisecond))
|
throttler.Ping(t0.Add(time.Duration(delayMs-1) * time.Millisecond))
|
||||||
assert.Equal(t, v1, value)
|
assert.Equal(t, v1, value)
|
||||||
throttler.ping(t0.Add(time.Duration(delayMs) * time.Millisecond))
|
throttler.Ping(t0.Add(time.Duration(delayMs) * time.Millisecond))
|
||||||
assert.Equal(t, v3, value)
|
assert.Equal(t, v3, value)
|
||||||
value = 0
|
value = 0
|
||||||
// another ping won' deliver the same value again.
|
// another ping won' deliver the same value again.
|
||||||
throttler.ping(t0.Add(time.Duration(delayMs*2) * time.Millisecond))
|
throttler.Ping(t0.Add(time.Duration(delayMs*2) * time.Millisecond))
|
||||||
assert.Equal(t, 0, value)
|
assert.Equal(t, 0, value)
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package main
|
package throttling
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
@ -26,7 +26,7 @@ func NewThrottler[T any](notifier func(t *T), minDelaySeconds float64) Throttler
|
|||||||
// for the last notification to be sent. If not, it is stored as a pending event to
|
// for the last notification to be sent. If not, it is stored as a pending event to
|
||||||
// be sent later. New events that come in before a notification is sent override the
|
// be sent later. New events that come in before a notification is sent override the
|
||||||
// pending event.
|
// pending event.
|
||||||
func (throttler *Throttler[T]) notify(time time.Time, value *T) {
|
func (throttler *Throttler[T]) Notify(time time.Time, value *T) {
|
||||||
if time.Sub(throttler.lastReportedTime).Seconds() >= throttler.minDelaySeconds {
|
if time.Sub(throttler.lastReportedTime).Seconds() >= throttler.minDelaySeconds {
|
||||||
throttler.notifier(value)
|
throttler.notifier(value)
|
||||||
throttler.lastReportedTime = time
|
throttler.lastReportedTime = time
|
||||||
@ -38,8 +38,8 @@ func (throttler *Throttler[T]) notify(time time.Time, value *T) {
|
|||||||
|
|
||||||
// To be called periodically. It sends out any pending events if the time the last
|
// To be called periodically. It sends out any pending events if the time the last
|
||||||
// notification was sent is long enough ago.
|
// notification was sent is long enough ago.
|
||||||
func (throttler *Throttler[T]) ping(time time.Time) {
|
func (throttler *Throttler[T]) Ping(time time.Time) {
|
||||||
if throttler.pendingValue != nil {
|
if throttler.pendingValue != nil {
|
||||||
throttler.notify(time, throttler.pendingValue)
|
throttler.Notify(time, throttler.pendingValue)
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user