diff --git a/cmd/converge/server.go b/cmd/converge/server.go index 14b0d5a..5908ef7 100644 --- a/cmd/converge/server.go +++ b/cmd/converge/server.go @@ -1,17 +1,13 @@ package main import ( + "cidebug/pkg/converge" "cidebug/pkg/iowrappers" "fmt" "github.com/gorilla/websocket" - "github.com/hashicorp/yamux" - "io" "log" - "net" "net/http" "regexp" - "sync" - "time" ) var upgrader = websocket.Upgrader{ @@ -40,186 +36,6 @@ func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request) handleWebSocket(w, r, endpoint.handler) } -type Agent struct { - // server session - clientSession *yamux.Session - publicId string - startTime time.Time -} - -type Client struct { - publicId string - agent net.Conn - client iowrappers.ReadWriteAddrCloser - startTime time.Time -} - -func NewAgent(publicId string, - agentSession *yamux.Session) *Agent { - return &Agent{ - clientSession: agentSession, - publicId: publicId, - startTime: time.Now(), - } -} - -func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, agentConn net.Conn) *Client { - return &Client{ - publicId: publicId, - agent: agentConn, - client: clientConn, - startTime: time.Now(), - } -} - -type Admin struct { - // map of public id to agent - mutex sync.Mutex - agents map[string]*Agent - clients []*Client -} - -func NewAdmin() *Admin { - admin := Admin{ - mutex: sync.Mutex{}, - agents: make(map[string]*Agent), - clients: make([]*Client, 0), // not strictly needed - } - return &admin -} - -func (admin *Admin) logStatus() { - log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") - for _, agent := range admin.agents { - agent.clientSession.RemoteAddr() - log.Printf("%-20s %-20s %-20s\n", agent.publicId, - agent.startTime.Format("2006-01-02 15:04:05"), - agent.clientSession.RemoteAddr().String()) - } - log.Println("") - log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") - for _, client := range admin.clients { - log.Printf("%-20s %-20s %-20s", client.publicId, - client.startTime.Format("2006-01-02 15:04:05"), - client.client.RemoteAddr()) - } - log.Printf("\n") -} - -func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent, error) { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - agent := admin.agents[publicId] - if agent != nil { - return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId) - } - session, err := yamux.Client(conn, nil) - if err != nil { - return nil, err - } - agent = NewAgent(publicId, session) - admin.agents[publicId] = agent - admin.logStatus() - return agent, nil -} - -func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser) (*Client, error) { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - agent := admin.agents[publicId] - if agent == nil { - // we should setup on-demend connections ot agents later. - return nil, fmt.Errorf("No agent found for publicId '%s'", publicId) - } - agentConn, err := agent.clientSession.Open() - if err != nil { - return nil, err - } - client := NewClient(publicId, clientConn, agentConn) - admin.clients = append(admin.clients, client) - admin.logStatus() - return client, nil -} - -func (admin *Admin) RemoveAgent(publicId string) error { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - agent := admin.agents[publicId] - if agent == nil { - return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) - } - log.Printf("Removing agent: '%s'", publicId) - err := agent.clientSession.Close() - if err != nil { - log.Printf("Could not close yamux client session for '%s'\n", publicId) - } - delete(admin.agents, publicId) - admin.logStatus() - return nil -} - -func (admin *Admin) RemoveClient(client *Client) error { - admin.mutex.Lock() - defer admin.mutex.Unlock() - - log.Printf("Removing client: '%s' created at %s\n", client.publicId, - client.startTime.Format("2006-01-02 15:04:05")) - // try to explicitly close connection to the agent. - _ = client.agent.Close() - _ = client.client.Close() - - for i, _client := range admin.clients { - if _client == _client { - admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) - break - } - } - admin.logStatus() - return nil -} - -func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error { - defer conn.Close() - // TODO: remove agent return value - agent, err := admin.addAgent(publicId, conn) - if err != nil { - return err - } - defer func() { - admin.RemoveAgent(publicId) - }() - log.Printf("Agent registered: '%s'\n", publicId) - for !agent.clientSession.IsClosed() { - time.Sleep(1 * time.Second) - } - return nil -} - -func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser) error { - defer conn.Close() - client, err := admin.addClient(publicId, conn) - if err != nil { - return err - } - defer func() { - admin.RemoveClient(client) - }() - log.Printf("Connecting client and agent: '%s'\n", publicId) - iowrappers.SynchronizeStreams(client.client, client.agent) - return nil -} - -func (admin *Admin) log() { - log.Println("CONNECTIONS") - for _, agent := range admin.agents { - log.Println(agent.publicId) - } - log.Printf("\n") -} - func parsePublicId(path string) (publicId string, _ error) { pattern := regexp.MustCompile("^/[^/]+/([^/]+)$") matches := pattern.FindStringSubmatch(path) @@ -231,7 +47,7 @@ func parsePublicId(path string) (publicId string, _ error) { func main() { - admin := NewAdmin() + admin := converge.NewAdmin() registrationService := WebSocketService{ handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) { publicId, err := parsePublicId(r.URL.Path) @@ -265,6 +81,6 @@ func main() { http.HandleFunc("/client/", clientService.handle) // Start HTTP server - fmt.Println("WebSocket server listening on :8000") + fmt.Println("Rendez-vous server listening on :8000") log.Fatal(http.ListenAndServe(":8000", nil)) } diff --git a/pkg/converge/admin.go b/pkg/converge/admin.go new file mode 100644 index 0000000..2f7e058 --- /dev/null +++ b/pkg/converge/admin.go @@ -0,0 +1,192 @@ +package converge + +import ( + "cidebug/pkg/iowrappers" + "fmt" + "github.com/hashicorp/yamux" + "io" + "log" + "net" + "sync" + "time" +) + +type Agent struct { + // server session + clientSession *yamux.Session + publicId string + startTime time.Time +} + +type Client struct { + publicId string + agent net.Conn + client iowrappers.ReadWriteAddrCloser + startTime time.Time +} + +func NewAgent(publicId string, + agentSession *yamux.Session) *Agent { + return &Agent{ + clientSession: agentSession, + publicId: publicId, + startTime: time.Now(), + } +} + +func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, agentConn net.Conn) *Client { + return &Client{ + publicId: publicId, + agent: agentConn, + client: clientConn, + startTime: time.Now(), + } +} + +type Admin struct { + // map of public id to agent + mutex sync.Mutex + agents map[string]*Agent + clients []*Client +} + +func NewAdmin() *Admin { + admin := Admin{ + mutex: sync.Mutex{}, + agents: make(map[string]*Agent), + clients: make([]*Client, 0), // not strictly needed + } + return &admin +} + +func (admin *Admin) logStatus() { + log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") + for _, agent := range admin.agents { + agent.clientSession.RemoteAddr() + log.Printf("%-20s %-20s %-20s\n", agent.publicId, + agent.startTime.Format("2006-01-02 15:04:05"), + agent.clientSession.RemoteAddr().String()) + } + log.Println("") + log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") + for _, client := range admin.clients { + log.Printf("%-20s %-20s %-20s", client.publicId, + client.startTime.Format("2006-01-02 15:04:05"), + client.client.RemoteAddr()) + } + log.Printf("\n") +} + +func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent, error) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent != nil { + return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId) + } + session, err := yamux.Client(conn, nil) + if err != nil { + return nil, err + } + agent = NewAgent(publicId, session) + admin.agents[publicId] = agent + admin.logStatus() + return agent, nil +} + +func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser) (*Client, error) { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent == nil { + // we should setup on-demend connections ot agents later. + return nil, fmt.Errorf("No agent found for publicId '%s'", publicId) + } + agentConn, err := agent.clientSession.Open() + if err != nil { + return nil, err + } + client := NewClient(publicId, clientConn, agentConn) + admin.clients = append(admin.clients, client) + admin.logStatus() + return client, nil +} + +func (admin *Admin) RemoveAgent(publicId string) error { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + agent := admin.agents[publicId] + if agent == nil { + return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) + } + log.Printf("Removing agent: '%s'", publicId) + err := agent.clientSession.Close() + if err != nil { + log.Printf("Could not close yamux client session for '%s'\n", publicId) + } + delete(admin.agents, publicId) + admin.logStatus() + return nil +} + +func (admin *Admin) RemoveClient(client *Client) error { + admin.mutex.Lock() + defer admin.mutex.Unlock() + + log.Printf("Removing client: '%s' created at %s\n", client.publicId, + client.startTime.Format("2006-01-02 15:04:05")) + // try to explicitly close connection to the agent. + _ = client.agent.Close() + _ = client.client.Close() + + for i, _client := range admin.clients { + if _client == _client { + admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) + break + } + } + admin.logStatus() + return nil +} + +func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error { + defer conn.Close() + // TODO: remove agent return value + agent, err := admin.addAgent(publicId, conn) + if err != nil { + return err + } + defer func() { + admin.RemoveAgent(publicId) + }() + log.Printf("Agent registered: '%s'\n", publicId) + for !agent.clientSession.IsClosed() { + time.Sleep(1 * time.Second) + } + return nil +} + +func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser) error { + defer conn.Close() + client, err := admin.addClient(publicId, conn) + if err != nil { + return err + } + defer func() { + admin.RemoveClient(client) + }() + log.Printf("Connecting client and agent: '%s'\n", publicId) + iowrappers.SynchronizeStreams(client.client, client.agent) + return nil +} + +func (admin *Admin) log() { + log.Println("CONNECTIONS") + for _, agent := range admin.agents { + log.Println(agent.publicId) + } + log.Printf("\n") +}