From fed2aaeaf9790645490d427b9c08f2c87c690d91 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Fri, 16 Aug 2024 14:59:18 +0200 Subject: [PATCH] throttling implemented for events. --- cmd/converge/converge.go | 18 ++++++++++-- cmd/converge/notifier.go | 32 ++++++++++++++++++---- cmd/converge/throttler.go | 45 ++++++++++++++++++++++++++++++ cmd/converge/throttler_test.go | 50 ++++++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 7 deletions(-) create mode 100644 cmd/converge/throttler.go create mode 100644 cmd/converge/throttler_test.go diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go index 3f37074..b42241a 100644 --- a/cmd/converge/converge.go +++ b/cmd/converge/converge.go @@ -12,6 +12,7 @@ import ( _ "net/http/pprof" "os" "regexp" + "strconv" "strings" _ "time/tzdata" ) @@ -54,7 +55,9 @@ func printHelp(msg string) { " a different context path. For instance to host converge at a base\n" + " URL of https://example.com/converge/, specify /converge/ (with\n" + " trailing slash. \n" + - "--pprof: Enable the pprof endpoint at /debug/pprof" + "--pprof: Enable the pprof endpoint at /debug/pprof\n" + + "--throttling-interval: Minimum delay between notificaitons ot users and more updating \n" + + " prometheus statistics (default 2s). " fmt.Fprintln(os.Stderr, helpText) os.Exit(1) } @@ -72,6 +75,7 @@ func main() { staticdir := "../static" contextpath := "/" pprof := false + throttlingInterval := 2.0 args := os.Args[1:] for len(args) > 0 && strings.HasPrefix(args[0], "-") { @@ -96,6 +100,16 @@ func main() { args = args[1:] case "--pprof": pprof = true + case "--throttling-interval": + if len(args) <= 1 { + printHelp("The --throttling-interval option expects an argument") + } + var err error + throttlingInterval, err = strconv.ParseFloat(args[1], 64) + if err != nil { + printHelp(fmt.Sprintf("Invalid value for throttling interval '%s;", args[1])) + } + args = args[1:] default: printHelp("Unknown option " + args[0]) } @@ -112,7 +126,7 @@ func main() { // Prometheus // And the MatchMaker. The MatchMakers sends state notifications to websessions // and prometheus. - notifications := NewStateNotifier() + notifications := NewStateNotifier(throttlingInterval) websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel) // monitoring prometheusMux := http.NewServeMux() diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go index 241b804..72106a6 100644 --- a/cmd/converge/notifier.go +++ b/cmd/converge/notifier.go @@ -1,20 +1,42 @@ package main -import "git.wamblee.org/converge/pkg/models" +import ( + "git.wamblee.org/converge/pkg/models" + "time" +) type StateNotifier struct { + throttler Throttler[models.State] + eventChannel chan *models.State webNotificationChannel chan *models.State prometheusNotificationChannel chan *models.State } -func NewStateNotifier() *StateNotifier { - return &StateNotifier{ +func NewStateNotifier(minDelaySeconds float64) *StateNotifier { + notifier := StateNotifier{ + eventChannel: make(chan *models.State), webNotificationChannel: make(chan *models.State), prometheusNotificationChannel: make(chan *models.State), } + notifier.throttler = NewThrottler(func(state *models.State) { + notifier.webNotificationChannel <- state + notifier.prometheusNotificationChannel <- state + }, minDelaySeconds) + + ticker := time.NewTicker(1 * time.Second) + go func() { + for { + select { + case <-ticker.C: + notifier.throttler.ping(time.Now()) + case state := <-notifier.eventChannel: + notifier.throttler.notify(time.Now(), state) + } + } + }() + return ¬ifier } func (notifier StateNotifier) Publish(state *models.State) { - notifier.webNotificationChannel <- state - notifier.prometheusNotificationChannel <- state + notifier.eventChannel <- state } diff --git a/cmd/converge/throttler.go b/cmd/converge/throttler.go new file mode 100644 index 0000000..275085d --- /dev/null +++ b/cmd/converge/throttler.go @@ -0,0 +1,45 @@ +package main + +import "time" + +// Throttling notifications to prometheus and web clients +// TO be used in a single-threaded manner. +type Throttler[T any] struct { + minDelaySeconds float64 + // ucntion to call to implement the notification. + notifier func(t *T) + lastReportedTime time.Time + pendingValue *T +} + +func NewThrottler[T any](notifier func(t *T), minDelaySeconds float64) Throttler[T] { + throttler := Throttler[T]{ + minDelaySeconds: minDelaySeconds, + notifier: notifier, + lastReportedTime: time.Time{}, + pendingValue: nil, + } + return throttler +} + +// Notify there is a new value. Performs notification if it was long enough ago +// for the last notification to be sent. If not, it is stored as a pending event to +// be sent later. New events that come in before a notification is sent override the +// pending event. +func (throttler *Throttler[T]) notify(time time.Time, value *T) { + if time.Sub(throttler.lastReportedTime).Seconds() >= throttler.minDelaySeconds { + throttler.notifier(value) + throttler.lastReportedTime = time + throttler.pendingValue = nil + return + } + throttler.pendingValue = value +} + +// To be called periodically. It sends out any pending events if the time the last +// notification was sent is long enough ago. +func (throttler *Throttler[T]) ping(time time.Time) { + if throttler.pendingValue != nil { + throttler.notify(time, throttler.pendingValue) + } +} diff --git a/cmd/converge/throttler_test.go b/cmd/converge/throttler_test.go new file mode 100644 index 0000000..57abc73 --- /dev/null +++ b/cmd/converge/throttler_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func Test_throttlerImmediateNotificationAfterInitialized(t *testing.T) { + value := 0 + throttler := NewThrottler[int](func(v *int) { + value = *v + }, 1.0) + + t0 := time.Now() + v := 1 + throttler.notify(t0, &v) + assert.Equal(t, v, value) + value = 0 + // subsequent ping will not lead to a notification + throttler.ping(t0.Add(10 * time.Second)) + assert.Equal(t, 0, value) +} + +func Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay(t *testing.T) { + value := 0 + delayMs := 1000 + throttler := NewThrottler[int](func(v *int) { + value = *v + }, float64(delayMs)/1000.0) + + t0 := time.Now() + v1 := 1 + // v2 will not be delivered, the last value in the time interval will be + v2 := 2 + v3 := 3 + throttler.notify(t0, &v1) + assert.Equal(t, v1, value) + throttler.notify(t0, &v2) + throttler.notify(t0, &v3) + assert.Equal(t, v1, value) + throttler.ping(t0.Add(time.Duration(delayMs-1) * time.Millisecond)) + assert.Equal(t, v1, value) + throttler.ping(t0.Add(time.Duration(delayMs) * time.Millisecond)) + assert.Equal(t, v3, value) + value = 0 + // another ping won' deliver the same value again. + throttler.ping(t0.Add(time.Duration(delayMs*2) * time.Millisecond)) + assert.Equal(t, 0, value) +}