From 3cd2ccf25c701b9a3a3f27bcfe4c747fdd3a6d4a Mon Sep 17 00:00:00 2001
From: Erik Brakkee <erik@brakkee.org>
Date: Fri, 16 Aug 2024 14:59:18 +0200
Subject: [PATCH] throttling implemented for events.

---
 cmd/converge/converge.go       | 18 ++++++++++--
 cmd/converge/notifier.go       | 32 ++++++++++++++++++----
 cmd/converge/throttler.go      | 45 ++++++++++++++++++++++++++++++
 cmd/converge/throttler_test.go | 50 ++++++++++++++++++++++++++++++++++
 4 files changed, 138 insertions(+), 7 deletions(-)
 create mode 100644 cmd/converge/throttler.go
 create mode 100644 cmd/converge/throttler_test.go

diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go
index 3f37074..b42241a 100644
--- a/cmd/converge/converge.go
+++ b/cmd/converge/converge.go
@@ -12,6 +12,7 @@ import (
 	_ "net/http/pprof"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	_ "time/tzdata"
 )
@@ -54,7 +55,9 @@ func printHelp(msg string) {
 		"                           a different context path. For instance to host converge at a base\n" +
 		"                           URL of https://example.com/converge/, specify /converge/ (with\n" +
 		"                           trailing slash. \n" +
-		"--pprof:                   Enable the pprof endpoint at /debug/pprof"
+		"--pprof:                   Enable the pprof endpoint at /debug/pprof\n" +
+		"--throttling-interval:     Minimum delay between notificaitons ot users and more updating \n" +
+		"                           prometheus statistics (default 2s). "
 	fmt.Fprintln(os.Stderr, helpText)
 	os.Exit(1)
 }
@@ -72,6 +75,7 @@ func main() {
 	staticdir := "../static"
 	contextpath := "/"
 	pprof := false
+	throttlingInterval := 2.0
 
 	args := os.Args[1:]
 	for len(args) > 0 && strings.HasPrefix(args[0], "-") {
@@ -96,6 +100,16 @@ func main() {
 			args = args[1:]
 		case "--pprof":
 			pprof = true
+		case "--throttling-interval":
+			if len(args) <= 1 {
+				printHelp("The --throttling-interval option expects an argument")
+			}
+			var err error
+			throttlingInterval, err = strconv.ParseFloat(args[1], 64)
+			if err != nil {
+				printHelp(fmt.Sprintf("Invalid value for throttling interval '%s;", args[1]))
+			}
+			args = args[1:]
 		default:
 			printHelp("Unknown option " + args[0])
 		}
@@ -112,7 +126,7 @@ func main() {
 	// Prometheus
 	// And the MatchMaker. The MatchMakers sends state notifications to websessions
 	// and prometheus.
-	notifications := NewStateNotifier()
+	notifications := NewStateNotifier(throttlingInterval)
 	websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel)
 	// monitoring
 	prometheusMux := http.NewServeMux()
diff --git a/cmd/converge/notifier.go b/cmd/converge/notifier.go
index 241b804..72106a6 100644
--- a/cmd/converge/notifier.go
+++ b/cmd/converge/notifier.go
@@ -1,20 +1,42 @@
 package main
 
-import "git.wamblee.org/converge/pkg/models"
+import (
+	"git.wamblee.org/converge/pkg/models"
+	"time"
+)
 
 type StateNotifier struct {
+	throttler                     Throttler[models.State]
+	eventChannel                  chan *models.State
 	webNotificationChannel        chan *models.State
 	prometheusNotificationChannel chan *models.State
 }
 
-func NewStateNotifier() *StateNotifier {
-	return &StateNotifier{
+func NewStateNotifier(minDelaySeconds float64) *StateNotifier {
+	notifier := StateNotifier{
+		eventChannel:                  make(chan *models.State),
 		webNotificationChannel:        make(chan *models.State),
 		prometheusNotificationChannel: make(chan *models.State),
 	}
+	notifier.throttler = NewThrottler(func(state *models.State) {
+		notifier.webNotificationChannel <- state
+		notifier.prometheusNotificationChannel <- state
+	}, minDelaySeconds)
+
+	ticker := time.NewTicker(1 * time.Second)
+	go func() {
+		for {
+			select {
+			case <-ticker.C:
+				notifier.throttler.ping(time.Now())
+			case state := <-notifier.eventChannel:
+				notifier.throttler.notify(time.Now(), state)
+			}
+		}
+	}()
+	return &notifier
 }
 
 func (notifier StateNotifier) Publish(state *models.State) {
-	notifier.webNotificationChannel <- state
-	notifier.prometheusNotificationChannel <- state
+	notifier.eventChannel <- state
 }
diff --git a/cmd/converge/throttler.go b/cmd/converge/throttler.go
new file mode 100644
index 0000000..275085d
--- /dev/null
+++ b/cmd/converge/throttler.go
@@ -0,0 +1,45 @@
+package main
+
+import "time"
+
+// Throttling notifications to prometheus and web clients
+// TO be used in a single-threaded manner.
+type Throttler[T any] struct {
+	minDelaySeconds float64
+	// ucntion to call to implement the notification.
+	notifier         func(t *T)
+	lastReportedTime time.Time
+	pendingValue     *T
+}
+
+func NewThrottler[T any](notifier func(t *T), minDelaySeconds float64) Throttler[T] {
+	throttler := Throttler[T]{
+		minDelaySeconds:  minDelaySeconds,
+		notifier:         notifier,
+		lastReportedTime: time.Time{},
+		pendingValue:     nil,
+	}
+	return throttler
+}
+
+// Notify there is a new value. Performs notification if it was long enough ago
+// for the last notification to be sent. If not, it is stored as a pending event to
+// be sent later. New events that come in before a notification is sent override the
+// pending event.
+func (throttler *Throttler[T]) notify(time time.Time, value *T) {
+	if time.Sub(throttler.lastReportedTime).Seconds() >= throttler.minDelaySeconds {
+		throttler.notifier(value)
+		throttler.lastReportedTime = time
+		throttler.pendingValue = nil
+		return
+	}
+	throttler.pendingValue = value
+}
+
+// To be called periodically. It sends out any pending events if the time the last
+// notification was sent is long enough ago.
+func (throttler *Throttler[T]) ping(time time.Time) {
+	if throttler.pendingValue != nil {
+		throttler.notify(time, throttler.pendingValue)
+	}
+}
diff --git a/cmd/converge/throttler_test.go b/cmd/converge/throttler_test.go
new file mode 100644
index 0000000..57abc73
--- /dev/null
+++ b/cmd/converge/throttler_test.go
@@ -0,0 +1,50 @@
+package main
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
+)
+
+func Test_throttlerImmediateNotificationAfterInitialized(t *testing.T) {
+	value := 0
+	throttler := NewThrottler[int](func(v *int) {
+		value = *v
+	}, 1.0)
+
+	t0 := time.Now()
+	v := 1
+	throttler.notify(t0, &v)
+	assert.Equal(t, v, value)
+	value = 0
+	// subsequent ping will not lead to a notification
+	throttler.ping(t0.Add(10 * time.Second))
+	assert.Equal(t, 0, value)
+}
+
+func Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay(t *testing.T) {
+	value := 0
+	delayMs := 1000
+	throttler := NewThrottler[int](func(v *int) {
+		value = *v
+	}, float64(delayMs)/1000.0)
+
+	t0 := time.Now()
+	v1 := 1
+	// v2 will not be delivered, the last value in the time interval will be
+	v2 := 2
+	v3 := 3
+	throttler.notify(t0, &v1)
+	assert.Equal(t, v1, value)
+	throttler.notify(t0, &v2)
+	throttler.notify(t0, &v3)
+	assert.Equal(t, v1, value)
+	throttler.ping(t0.Add(time.Duration(delayMs-1) * time.Millisecond))
+	assert.Equal(t, v1, value)
+	throttler.ping(t0.Add(time.Duration(delayMs) * time.Millisecond))
+	assert.Equal(t, v3, value)
+	value = 0
+	// another ping won' deliver the same value again.
+	throttler.ping(t0.Add(time.Duration(delayMs*2) * time.Millisecond))
+	assert.Equal(t, 0, value)
+}