throttling implemented for events.
This commit is contained in:
parent
9634cd29f2
commit
fed2aaeaf9
@ -12,6 +12,7 @@ import (
|
|||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
_ "time/tzdata"
|
_ "time/tzdata"
|
||||||
)
|
)
|
||||||
@ -54,7 +55,9 @@ func printHelp(msg string) {
|
|||||||
" a different context path. For instance to host converge at a base\n" +
|
" a different context path. For instance to host converge at a base\n" +
|
||||||
" URL of https://example.com/converge/, specify /converge/ (with\n" +
|
" URL of https://example.com/converge/, specify /converge/ (with\n" +
|
||||||
" trailing slash. \n" +
|
" trailing slash. \n" +
|
||||||
"--pprof: Enable the pprof endpoint at /debug/pprof"
|
"--pprof: Enable the pprof endpoint at /debug/pprof\n" +
|
||||||
|
"--throttling-interval: Minimum delay between notificaitons ot users and more updating \n" +
|
||||||
|
" prometheus statistics (default 2s). "
|
||||||
fmt.Fprintln(os.Stderr, helpText)
|
fmt.Fprintln(os.Stderr, helpText)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
@ -72,6 +75,7 @@ func main() {
|
|||||||
staticdir := "../static"
|
staticdir := "../static"
|
||||||
contextpath := "/"
|
contextpath := "/"
|
||||||
pprof := false
|
pprof := false
|
||||||
|
throttlingInterval := 2.0
|
||||||
|
|
||||||
args := os.Args[1:]
|
args := os.Args[1:]
|
||||||
for len(args) > 0 && strings.HasPrefix(args[0], "-") {
|
for len(args) > 0 && strings.HasPrefix(args[0], "-") {
|
||||||
@ -96,6 +100,16 @@ func main() {
|
|||||||
args = args[1:]
|
args = args[1:]
|
||||||
case "--pprof":
|
case "--pprof":
|
||||||
pprof = true
|
pprof = true
|
||||||
|
case "--throttling-interval":
|
||||||
|
if len(args) <= 1 {
|
||||||
|
printHelp("The --throttling-interval option expects an argument")
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
throttlingInterval, err = strconv.ParseFloat(args[1], 64)
|
||||||
|
if err != nil {
|
||||||
|
printHelp(fmt.Sprintf("Invalid value for throttling interval '%s;", args[1]))
|
||||||
|
}
|
||||||
|
args = args[1:]
|
||||||
default:
|
default:
|
||||||
printHelp("Unknown option " + args[0])
|
printHelp("Unknown option " + args[0])
|
||||||
}
|
}
|
||||||
@ -112,7 +126,7 @@ func main() {
|
|||||||
// Prometheus
|
// Prometheus
|
||||||
// And the MatchMaker. The MatchMakers sends state notifications to websessions
|
// And the MatchMaker. The MatchMakers sends state notifications to websessions
|
||||||
// and prometheus.
|
// and prometheus.
|
||||||
notifications := NewStateNotifier()
|
notifications := NewStateNotifier(throttlingInterval)
|
||||||
websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel)
|
websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel)
|
||||||
// monitoring
|
// monitoring
|
||||||
prometheusMux := http.NewServeMux()
|
prometheusMux := http.NewServeMux()
|
||||||
|
@ -1,20 +1,42 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import "git.wamblee.org/converge/pkg/models"
|
import (
|
||||||
|
"git.wamblee.org/converge/pkg/models"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type StateNotifier struct {
|
type StateNotifier struct {
|
||||||
|
throttler Throttler[models.State]
|
||||||
|
eventChannel chan *models.State
|
||||||
webNotificationChannel chan *models.State
|
webNotificationChannel chan *models.State
|
||||||
prometheusNotificationChannel chan *models.State
|
prometheusNotificationChannel chan *models.State
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateNotifier() *StateNotifier {
|
func NewStateNotifier(minDelaySeconds float64) *StateNotifier {
|
||||||
return &StateNotifier{
|
notifier := StateNotifier{
|
||||||
|
eventChannel: make(chan *models.State),
|
||||||
webNotificationChannel: make(chan *models.State),
|
webNotificationChannel: make(chan *models.State),
|
||||||
prometheusNotificationChannel: make(chan *models.State),
|
prometheusNotificationChannel: make(chan *models.State),
|
||||||
}
|
}
|
||||||
|
notifier.throttler = NewThrottler(func(state *models.State) {
|
||||||
|
notifier.webNotificationChannel <- state
|
||||||
|
notifier.prometheusNotificationChannel <- state
|
||||||
|
}, minDelaySeconds)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
notifier.throttler.ping(time.Now())
|
||||||
|
case state := <-notifier.eventChannel:
|
||||||
|
notifier.throttler.notify(time.Now(), state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return ¬ifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (notifier StateNotifier) Publish(state *models.State) {
|
func (notifier StateNotifier) Publish(state *models.State) {
|
||||||
notifier.webNotificationChannel <- state
|
notifier.eventChannel <- state
|
||||||
notifier.prometheusNotificationChannel <- state
|
|
||||||
}
|
}
|
||||||
|
45
cmd/converge/throttler.go
Normal file
45
cmd/converge/throttler.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Throttling notifications to prometheus and web clients
|
||||||
|
// TO be used in a single-threaded manner.
|
||||||
|
type Throttler[T any] struct {
|
||||||
|
minDelaySeconds float64
|
||||||
|
// ucntion to call to implement the notification.
|
||||||
|
notifier func(t *T)
|
||||||
|
lastReportedTime time.Time
|
||||||
|
pendingValue *T
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewThrottler[T any](notifier func(t *T), minDelaySeconds float64) Throttler[T] {
|
||||||
|
throttler := Throttler[T]{
|
||||||
|
minDelaySeconds: minDelaySeconds,
|
||||||
|
notifier: notifier,
|
||||||
|
lastReportedTime: time.Time{},
|
||||||
|
pendingValue: nil,
|
||||||
|
}
|
||||||
|
return throttler
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify there is a new value. Performs notification if it was long enough ago
|
||||||
|
// for the last notification to be sent. If not, it is stored as a pending event to
|
||||||
|
// be sent later. New events that come in before a notification is sent override the
|
||||||
|
// pending event.
|
||||||
|
func (throttler *Throttler[T]) notify(time time.Time, value *T) {
|
||||||
|
if time.Sub(throttler.lastReportedTime).Seconds() >= throttler.minDelaySeconds {
|
||||||
|
throttler.notifier(value)
|
||||||
|
throttler.lastReportedTime = time
|
||||||
|
throttler.pendingValue = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
throttler.pendingValue = value
|
||||||
|
}
|
||||||
|
|
||||||
|
// To be called periodically. It sends out any pending events if the time the last
|
||||||
|
// notification was sent is long enough ago.
|
||||||
|
func (throttler *Throttler[T]) ping(time time.Time) {
|
||||||
|
if throttler.pendingValue != nil {
|
||||||
|
throttler.notify(time, throttler.pendingValue)
|
||||||
|
}
|
||||||
|
}
|
50
cmd/converge/throttler_test.go
Normal file
50
cmd/converge/throttler_test.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_throttlerImmediateNotificationAfterInitialized(t *testing.T) {
|
||||||
|
value := 0
|
||||||
|
throttler := NewThrottler[int](func(v *int) {
|
||||||
|
value = *v
|
||||||
|
}, 1.0)
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
|
v := 1
|
||||||
|
throttler.notify(t0, &v)
|
||||||
|
assert.Equal(t, v, value)
|
||||||
|
value = 0
|
||||||
|
// subsequent ping will not lead to a notification
|
||||||
|
throttler.ping(t0.Add(10 * time.Second))
|
||||||
|
assert.Equal(t, 0, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_TwoNotificationsInSHortSucessionSecondOneIsDeliverdWithDelay(t *testing.T) {
|
||||||
|
value := 0
|
||||||
|
delayMs := 1000
|
||||||
|
throttler := NewThrottler[int](func(v *int) {
|
||||||
|
value = *v
|
||||||
|
}, float64(delayMs)/1000.0)
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
|
v1 := 1
|
||||||
|
// v2 will not be delivered, the last value in the time interval will be
|
||||||
|
v2 := 2
|
||||||
|
v3 := 3
|
||||||
|
throttler.notify(t0, &v1)
|
||||||
|
assert.Equal(t, v1, value)
|
||||||
|
throttler.notify(t0, &v2)
|
||||||
|
throttler.notify(t0, &v3)
|
||||||
|
assert.Equal(t, v1, value)
|
||||||
|
throttler.ping(t0.Add(time.Duration(delayMs-1) * time.Millisecond))
|
||||||
|
assert.Equal(t, v1, value)
|
||||||
|
throttler.ping(t0.Add(time.Duration(delayMs) * time.Millisecond))
|
||||||
|
assert.Equal(t, v3, value)
|
||||||
|
value = 0
|
||||||
|
// another ping won' deliver the same value again.
|
||||||
|
throttler.ping(t0.Add(time.Duration(delayMs*2) * time.Millisecond))
|
||||||
|
assert.Equal(t, 0, value)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user