From e2c2cbd0ef48e05aad3d4db5d6bb774ce24b861f Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Fri, 19 Jul 2024 20:29:51 +0200 Subject: [PATCH] rendezvous mechanism with fixed id is now working. --- cmd/converge/server.go | 116 +++++++++++++++++++++++++++++++++++------ 1 file changed, 101 insertions(+), 15 deletions(-) diff --git a/cmd/converge/server.go b/cmd/converge/server.go index c8756ac..dda123a 100644 --- a/cmd/converge/server.go +++ b/cmd/converge/server.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/websocket" "log" "net/http" + "sync" ) var upgrader = websocket.Upgrader{ @@ -34,33 +35,118 @@ func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request) handleWebSocket(w, r, endpoint.handler) } -var client *iowrappers.WebSocketConn -var agent *iowrappers.WebSocketConn +type Agent struct { + agent *iowrappers.WebSocketConn + publicId string + client *iowrappers.WebSocketConn + agentAvailable chan bool + clientClosed chan bool +} + +func NewAgent(publicId string, + agentConn *iowrappers.WebSocketConn, + clientConn *iowrappers.WebSocketConn) *Agent { + return &Agent{ + agent: agentConn, + publicId: publicId, + client: clientConn, + agentAvailable: make(chan bool), + clientClosed: make(chan bool), + } +} + +type Admin struct { + // map of public id to agent + mutex sync.Mutex + agents map[string]*Agent +} + +func NewAdmin() *Admin { + admin := Admin{ + mutex: sync.Mutex{}, + agents: make(map[string]*Agent), + } + return &admin +} + +func (admin *Admin) addAgent(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent != nil && agent.agent != nil && agent.agent != conn { + return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId) + } + if agent != nil { + agent.agent = conn + return agent, nil + } + agent = NewAgent(publicId, conn, nil) + admin.agents[publicId] = agent + return agent, nil +} + +func (admin *Admin) addClient(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent != nil && agent.client != nil && agent.client != conn { + // we should setup on-demend connections ot agents later. + return nil, fmt.Errorf("A different client with same publicId '%s' already registered", publicId) + } + if agent != nil { + agent.client = conn + return agent, nil + } + agent = NewAgent(publicId, nil, conn) + admin.agents[publicId] = agent + return agent, nil +} + +func (admin *Admin) Register(publicId string, conn *iowrappers.WebSocketConn) error { + defer conn.Close() + agent, err := admin.addAgent(publicId, conn) + if err != nil { + return err + } + agent.agentAvailable <- true + <-agent.clientClosed + return nil +} + +func (admin *Admin) Connect(publicId string, conn *iowrappers.WebSocketConn) error { + defer conn.Close() + agent, err := admin.addClient(publicId, conn) + if err != nil { + return err + } + <-agent.agentAvailable + log.Println("Connecting websockets") + iowrappers.SynchronizeStreams(agent.client, agent.agent) + agent.clientClosed <- true + return nil +} func main() { - // Connect to TCP server - - clientClosed := make(chan bool) - serverAvailable := make(chan bool) + admin := NewAdmin() registrationService := WebSocketService{ handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) { log.Println("Got registration connection") - agent = conn - serverAvailable <- true - <-clientClosed + err := admin.Register("abc123", conn) + if err != nil { + log.Printf("Error %v", err) + } }, } clientService := WebSocketService{ handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) { log.Println("Got client connection") - client = conn - if agent == nil { - log.Println("There is no agent yet,, closing") - return + err := admin.Connect("abc123", conn) + if err != nil { + log.Printf("Error %v", err) } - iowrappers.SynchronizeStreams(client, agent) - clientClosed <- true }, }