rendezvous mechanism with fixed id is now working.
This commit is contained in:
parent
f59c8368ea
commit
e2c2cbd0ef
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
var upgrader = websocket.Upgrader{
|
var upgrader = websocket.Upgrader{
|
||||||
@ -34,33 +35,118 @@ func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request)
|
|||||||
handleWebSocket(w, r, endpoint.handler)
|
handleWebSocket(w, r, endpoint.handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
var client *iowrappers.WebSocketConn
|
type Agent struct {
|
||||||
var agent *iowrappers.WebSocketConn
|
agent *iowrappers.WebSocketConn
|
||||||
|
publicId string
|
||||||
|
client *iowrappers.WebSocketConn
|
||||||
|
agentAvailable chan bool
|
||||||
|
clientClosed chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAgent(publicId string,
|
||||||
|
agentConn *iowrappers.WebSocketConn,
|
||||||
|
clientConn *iowrappers.WebSocketConn) *Agent {
|
||||||
|
return &Agent{
|
||||||
|
agent: agentConn,
|
||||||
|
publicId: publicId,
|
||||||
|
client: clientConn,
|
||||||
|
agentAvailable: make(chan bool),
|
||||||
|
clientClosed: make(chan bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Admin struct {
|
||||||
|
// map of public id to agent
|
||||||
|
mutex sync.Mutex
|
||||||
|
agents map[string]*Agent
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAdmin() *Admin {
|
||||||
|
admin := Admin{
|
||||||
|
mutex: sync.Mutex{},
|
||||||
|
agents: make(map[string]*Agent),
|
||||||
|
}
|
||||||
|
return &admin
|
||||||
|
}
|
||||||
|
|
||||||
|
func (admin *Admin) addAgent(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) {
|
||||||
|
admin.mutex.Lock()
|
||||||
|
defer admin.mutex.Unlock()
|
||||||
|
|
||||||
|
agent := admin.agents[publicId]
|
||||||
|
if agent != nil && agent.agent != nil && agent.agent != conn {
|
||||||
|
return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId)
|
||||||
|
}
|
||||||
|
if agent != nil {
|
||||||
|
agent.agent = conn
|
||||||
|
return agent, nil
|
||||||
|
}
|
||||||
|
agent = NewAgent(publicId, conn, nil)
|
||||||
|
admin.agents[publicId] = agent
|
||||||
|
return agent, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (admin *Admin) addClient(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) {
|
||||||
|
admin.mutex.Lock()
|
||||||
|
defer admin.mutex.Unlock()
|
||||||
|
|
||||||
|
agent := admin.agents[publicId]
|
||||||
|
if agent != nil && agent.client != nil && agent.client != conn {
|
||||||
|
// we should setup on-demend connections ot agents later.
|
||||||
|
return nil, fmt.Errorf("A different client with same publicId '%s' already registered", publicId)
|
||||||
|
}
|
||||||
|
if agent != nil {
|
||||||
|
agent.client = conn
|
||||||
|
return agent, nil
|
||||||
|
}
|
||||||
|
agent = NewAgent(publicId, nil, conn)
|
||||||
|
admin.agents[publicId] = agent
|
||||||
|
return agent, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (admin *Admin) Register(publicId string, conn *iowrappers.WebSocketConn) error {
|
||||||
|
defer conn.Close()
|
||||||
|
agent, err := admin.addAgent(publicId, conn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
agent.agentAvailable <- true
|
||||||
|
<-agent.clientClosed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (admin *Admin) Connect(publicId string, conn *iowrappers.WebSocketConn) error {
|
||||||
|
defer conn.Close()
|
||||||
|
agent, err := admin.addClient(publicId, conn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
<-agent.agentAvailable
|
||||||
|
log.Println("Connecting websockets")
|
||||||
|
iowrappers.SynchronizeStreams(agent.client, agent.agent)
|
||||||
|
agent.clientClosed <- true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// Connect to TCP server
|
|
||||||
|
|
||||||
clientClosed := make(chan bool)
|
|
||||||
serverAvailable := make(chan bool)
|
|
||||||
|
|
||||||
|
admin := NewAdmin()
|
||||||
registrationService := WebSocketService{
|
registrationService := WebSocketService{
|
||||||
handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) {
|
handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) {
|
||||||
log.Println("Got registration connection")
|
log.Println("Got registration connection")
|
||||||
agent = conn
|
err := admin.Register("abc123", conn)
|
||||||
serverAvailable <- true
|
if err != nil {
|
||||||
<-clientClosed
|
log.Printf("Error %v", err)
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
clientService := WebSocketService{
|
clientService := WebSocketService{
|
||||||
handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) {
|
handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) {
|
||||||
log.Println("Got client connection")
|
log.Println("Got client connection")
|
||||||
client = conn
|
err := admin.Connect("abc123", conn)
|
||||||
if agent == nil {
|
if err != nil {
|
||||||
log.Println("There is no agent yet,, closing")
|
log.Printf("Error %v", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
iowrappers.SynchronizeStreams(client, agent)
|
|
||||||
clientClosed <- true
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user