using unbuffered channels everywhere now. Only change required was to initialize prometeus and the websessions before the matchmaker. This is because at startup the matchmaker wants to write a notification when it starts up but then prometheus and the websessions would not be there to read them.

Alternative solution would be to run all initialization code in go routines to make it independent of initialization order but having a defined initialization order is much cleaner.
This commit is contained in:
Erik Brakkee 2024-08-13 21:33:29 +02:00
parent aa05e5819b
commit fd064f249f
7 changed files with 24 additions and 18 deletions

View File

@ -58,6 +58,13 @@ func printHelp(msg string) {
} }
func main() { func main() {
// for debugging hangs relating to using unbuffered channels
//go func() {
// buf := make([]byte, 1000000)
// n := runtime.Stack(buf, true)
// time.Sleep(10 * time.Second)
// log.Println("LOG: \n" + string(buf[:n]))
//}()
downloaddir := "." downloaddir := "."
staticdir := "../static" staticdir := "../static"
@ -96,8 +103,14 @@ func main() {
} }
notifications := NewStateNotifier() notifications := NewStateNotifier()
admin := matchmaker.NewMatchMaker(notifications)
websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel) websessions := matchmaker.NewWebSessions(notifications.webNotificationChannel)
// monitoring
mux := http.NewServeMux()
setupPrometheus(mux, notifications.prometheusNotificationChannel)
go func() {
log.Fatal(http.ListenAndServe(":8001", mux))
}()
admin := matchmaker.NewMatchMaker(notifications)
// For agents connecting // For agents connecting
registrationService := websocketutil.WebSocketService{ registrationService := websocketutil.WebSocketService{
@ -167,13 +180,6 @@ func main() {
http.HandleFunc("/", catchAllHandler(contextpath)) http.HandleFunc("/", catchAllHandler(contextpath))
// monitoring
mux := http.NewServeMux()
setupPrometheus(mux, notifications.prometheusNotificationChannel)
go func() {
log.Fatal(http.ListenAndServe(":8001", mux))
}()
// Start HTTP server // Start HTTP server
fmt.Println("Rendez-vous server listening on :8000") fmt.Println("Rendez-vous server listening on :8000")
log.Fatal(http.ListenAndServe(":8000", nil)) log.Fatal(http.ListenAndServe(":8000", nil))

View File

@ -9,8 +9,8 @@ type StateNotifier struct {
func NewStateNotifier() *StateNotifier { func NewStateNotifier() *StateNotifier {
return &StateNotifier{ return &StateNotifier{
webNotificationChannel: make(chan *models.State, 10), webNotificationChannel: make(chan *models.State),
prometheusNotificationChannel: make(chan *models.State, 10), prometheusNotificationChannel: make(chan *models.State),
} }
} }

View File

@ -191,7 +191,7 @@ func setupPrometheus(mux *http.ServeMux, notifications chan *models.State) {
mux.Handle("/metrics", promhttp.Handler()) mux.Handle("/metrics", promhttp.Handler())
} }
var prometheusChannel = make(chan func(), 10) var prometheusChannel = make(chan func())
// serialize notifidcations and periodi updates of the duration. // serialize notifidcations and periodi updates of the duration.

View File

@ -54,5 +54,5 @@ func main() {
} }
// Block main goroutine forever // Block main goroutine forever
<-make(chan struct{}, 10) <-make(chan struct{})
} }

View File

@ -69,7 +69,7 @@ var helpMessageTemplate string
var helpMessage = formatHelpMessage() var helpMessage = formatHelpMessage()
// Events channel for asynchronous events. // Events channel for asynchronous events.
var events = make(chan func(), 10) var events = make(chan func())
// External interface, asynchronous, apart from the initialization. // External interface, asynchronous, apart from the initialization.

View File

@ -64,8 +64,8 @@ func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error)
} }
func SendWithTimeout[T any](channel GOBChannel, obj T) error { func SendWithTimeout[T any](channel GOBChannel, obj T) error {
done := make(chan bool, 10) done := make(chan bool)
errors := make(chan error, 10) errors := make(chan error)
SendAsync(channel, obj, done, errors) SendAsync(channel, obj, done, errors)
select { select {
@ -79,8 +79,8 @@ func SendWithTimeout[T any](channel GOBChannel, obj T) error {
} }
func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) { func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) {
result := make(chan T, 10) result := make(chan T)
errors := make(chan error, 10) errors := make(chan error)
ReceiveAsync(channel, result, errors) ReceiveAsync(channel, result, errors)
select { select {

View File

@ -73,7 +73,7 @@ func (sessions *WebSessions) NewSession(wsConnection net.Conn, ctx context.Conte
sessions.mutex.Lock() sessions.mutex.Lock()
defer sessions.mutex.Unlock() defer sessions.mutex.Unlock()
session := &WebSession{ session := &WebSession{
notifications: make(chan *models.State, 10), notifications: make(chan *models.State),
conn: wsConnection, conn: wsConnection,
ctx: ctx, ctx: ctx,
} }