From cfccf04f9dc9b7d6b4bef48be0557a4a91fbfc86 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sat, 20 Jul 2024 13:35:49 +0200 Subject: [PATCH] working server * administration appears coorect * multiple clients for one agent * logging of active connections * simple echo server on the agent. --- cmd/agent/agent.go | 29 +++++++- cmd/converge/server.go | 160 ++++++++++++++++++++++++++--------------- go.mod | 1 + go.sum | 2 + pkg/iowrappers/io.go | 20 +++++- pkg/iowrappers/sync.go | 4 +- 6 files changed, 150 insertions(+), 66 deletions(-) diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 07c650b..ba81480 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -14,6 +14,7 @@ import ( "github.com/creack/pty" "github.com/gliderlabs/ssh" + "github.com/hashicorp/yamux" "github.com/pkg/sftp" ) @@ -113,7 +114,31 @@ func main() { wsConn := iowrappers.NewWebSocketConn(conn) defer wsConn.Close() - // echo server - iowrappers.SynchronizeStreams(wsConn, wsConn) + listener, err := yamux.Client(wsConn, nil) + if err != nil { + panic(err) + } + + log.Println("Connection established to rendez-vous server, waiting for debug sessions") + + // Session is a listener + for { + conn, err := listener.Accept() + if err != nil { + panic(err) + } + go handleConnection(conn) + } } + +func handleConnection(conn io.ReadWriter) { + //stdio := bufio.NewReadWriter( + // bufio.NewReaderSize(os.Stdin, 0), + // bufio.NewWriterSize(os.Stdout, 0)) + + // echo server + io.Copy(conn, conn) + + //iowrappers.SynchronizeStreams(conn, stdio) +} diff --git a/cmd/converge/server.go b/cmd/converge/server.go index 2316cde..14b0d5a 100644 --- a/cmd/converge/server.go +++ b/cmd/converge/server.go @@ -4,10 +4,14 @@ import ( "cidebug/pkg/iowrappers" "fmt" "github.com/gorilla/websocket" + "github.com/hashicorp/yamux" + "io" "log" + "net" "net/http" "regexp" "sync" + "time" ) var upgrader = websocket.Upgrader{ @@ -16,7 +20,7 @@ var upgrader = websocket.Upgrader{ } func handleWebSocket(w http.ResponseWriter, r *http.Request, - handler func(w http.ResponseWriter, r *http.Request, websockerConnection *iowrappers.WebSocketConn)) { + handler func(w http.ResponseWriter, r *http.Request, websockerConnection iowrappers.ReadWriteAddrCloser)) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("Error upgrading to WebSocket:", err) @@ -29,7 +33,7 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request, } type WebSocketService struct { - handler func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) + handler func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) } func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request) { @@ -37,72 +41,106 @@ func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request) } type Agent struct { - agent *iowrappers.WebSocketConn - publicId string - client *iowrappers.WebSocketConn - agentAvailable chan bool - clientClosed chan bool + // server session + clientSession *yamux.Session + publicId string + startTime time.Time +} + +type Client struct { + publicId string + agent net.Conn + client iowrappers.ReadWriteAddrCloser + startTime time.Time } func NewAgent(publicId string, - agentConn *iowrappers.WebSocketConn, - clientConn *iowrappers.WebSocketConn) *Agent { + agentSession *yamux.Session) *Agent { return &Agent{ - agent: agentConn, - publicId: publicId, - client: clientConn, - agentAvailable: make(chan bool, 1), - clientClosed: make(chan bool, 1), + clientSession: agentSession, + publicId: publicId, + startTime: time.Now(), + } +} + +func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, agentConn net.Conn) *Client { + return &Client{ + publicId: publicId, + agent: agentConn, + client: clientConn, + startTime: time.Now(), } } type Admin struct { // map of public id to agent - mutex sync.Mutex - agents map[string]*Agent + mutex sync.Mutex + agents map[string]*Agent + clients []*Client } func NewAdmin() *Admin { admin := Admin{ - mutex: sync.Mutex{}, - agents: make(map[string]*Agent), + mutex: sync.Mutex{}, + agents: make(map[string]*Agent), + clients: make([]*Client, 0), // not strictly needed } return &admin } -func (admin *Admin) addAgent(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) { +func (admin *Admin) logStatus() { + log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") + for _, agent := range admin.agents { + agent.clientSession.RemoteAddr() + log.Printf("%-20s %-20s %-20s\n", agent.publicId, + agent.startTime.Format("2006-01-02 15:04:05"), + agent.clientSession.RemoteAddr().String()) + } + log.Println("") + log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") + for _, client := range admin.clients { + log.Printf("%-20s %-20s %-20s", client.publicId, + client.startTime.Format("2006-01-02 15:04:05"), + client.client.RemoteAddr()) + } + log.Printf("\n") +} + +func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent, error) { admin.mutex.Lock() defer admin.mutex.Unlock() agent := admin.agents[publicId] - if agent != nil && agent.agent != nil && agent.agent != conn { + if agent != nil { return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId) } - if agent != nil { - agent.agent = conn - return agent, nil + session, err := yamux.Client(conn, nil) + if err != nil { + return nil, err } - agent = NewAgent(publicId, conn, nil) + agent = NewAgent(publicId, session) admin.agents[publicId] = agent + admin.logStatus() return agent, nil } -func (admin *Admin) addClient(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) { +func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser) (*Client, error) { admin.mutex.Lock() defer admin.mutex.Unlock() agent := admin.agents[publicId] - if agent != nil && agent.client != nil && agent.client != conn { + if agent == nil { // we should setup on-demend connections ot agents later. - return nil, fmt.Errorf("A different client with same publicId '%s' already registered", publicId) + return nil, fmt.Errorf("No agent found for publicId '%s'", publicId) } - if agent != nil { - agent.client = conn - return agent, nil + agentConn, err := agent.clientSession.Open() + if err != nil { + return nil, err } - agent = NewAgent(publicId, nil, conn) - admin.agents[publicId] = agent - return agent, nil + client := NewClient(publicId, clientConn, agentConn) + admin.clients = append(admin.clients, client) + admin.logStatus() + return client, nil } func (admin *Admin) RemoveAgent(publicId string) error { @@ -113,32 +151,39 @@ func (admin *Admin) RemoveAgent(publicId string) error { if agent == nil { return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) } - agent.agent = nil - if agent.client == nil { - log.Printf("Removing agent: '%s'", publicId) - delete(admin.agents, publicId) + log.Printf("Removing agent: '%s'", publicId) + err := agent.clientSession.Close() + if err != nil { + log.Printf("Could not close yamux client session for '%s'\n", publicId) } + delete(admin.agents, publicId) + admin.logStatus() return nil } -func (admin *Admin) RemoveClient(publicId string) error { +func (admin *Admin) RemoveClient(client *Client) error { admin.mutex.Lock() defer admin.mutex.Unlock() - agent := admin.agents[publicId] - if agent == nil { - return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) - } - agent.client = nil - if agent.agent == nil { - log.Printf("Removing client: '%s'", publicId) - delete(admin.agents, publicId) + log.Printf("Removing client: '%s' created at %s\n", client.publicId, + client.startTime.Format("2006-01-02 15:04:05")) + // try to explicitly close connection to the agent. + _ = client.agent.Close() + _ = client.client.Close() + + for i, _client := range admin.clients { + if _client == _client { + admin.clients = append(admin.clients[:i], admin.clients[i+1:]...) + break + } } + admin.logStatus() return nil } -func (admin *Admin) Register(publicId string, conn *iowrappers.WebSocketConn) error { +func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error { defer conn.Close() + // TODO: remove agent return value agent, err := admin.addAgent(publicId, conn) if err != nil { return err @@ -146,26 +191,24 @@ func (admin *Admin) Register(publicId string, conn *iowrappers.WebSocketConn) er defer func() { admin.RemoveAgent(publicId) }() - log.Printf("After defer remove agent\n") - agent.agentAvailable <- true log.Printf("Agent registered: '%s'\n", publicId) - <-agent.clientClosed + for !agent.clientSession.IsClosed() { + time.Sleep(1 * time.Second) + } return nil } -func (admin *Admin) Connect(publicId string, conn *iowrappers.WebSocketConn) error { +func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser) error { defer conn.Close() - agent, err := admin.addClient(publicId, conn) + client, err := admin.addClient(publicId, conn) if err != nil { return err } defer func() { - admin.RemoveClient(publicId) + admin.RemoveClient(client) }() - <-agent.agentAvailable log.Printf("Connecting client and agent: '%s'\n", publicId) - iowrappers.SynchronizeStreams(agent.client, agent.agent) - agent.clientClosed <- true + iowrappers.SynchronizeStreams(client.client, client.agent) return nil } @@ -184,14 +227,13 @@ func parsePublicId(path string) (publicId string, _ error) { return "", fmt.Errorf("Invalid URL path '%s'", path) } return matches[1], nil - } func main() { admin := NewAdmin() registrationService := WebSocketService{ - handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) { + handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) { publicId, err := parsePublicId(r.URL.Path) if err != nil { log.Printf("Cannot parse public id from url: '%v'\n", err) @@ -205,7 +247,7 @@ func main() { }, } clientService := WebSocketService{ - handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) { + handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) { publicId, err := parsePublicId(r.URL.Path) if err != nil { log.Printf("Cannot parse public id from url: '%v'\n", err) diff --git a/go.mod b/go.mod index 2f22ceb..40ce1fe 100755 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/creack/pty v1.1.21 github.com/gliderlabs/ssh v0.3.7 github.com/gorilla/websocket v1.5.3 + github.com/hashicorp/yamux v0.1.1 github.com/pkg/sftp v1.13.6 golang.org/x/crypto v0.25.0 golang.org/x/term v0.22.0 diff --git a/go.sum b/go.sum index 4762ea3..b17c2df 100755 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/gliderlabs/ssh v0.3.7 h1:iV3Bqi942d9huXnzEF2Mt+CY9gLu8DNM4Obd+8bODRE= github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= diff --git a/pkg/iowrappers/io.go b/pkg/iowrappers/io.go index 68ca433..5cbcc72 100644 --- a/pkg/iowrappers/io.go +++ b/pkg/iowrappers/io.go @@ -1,14 +1,20 @@ package iowrappers -import "github.com/gorilla/websocket" +import ( + "github.com/gorilla/websocket" + "io" + "net" +) type WebSocketConn struct { conn *websocket.Conn buf []byte } -func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn { - return &WebSocketConn{conn: conn} +type ReadWriteAddrCloser interface { + io.ReadWriteCloser + + RemoteAddr() net.Addr } func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) { @@ -26,6 +32,10 @@ func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) { return n, err } +func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn { + return &WebSocketConn{conn: conn} +} + func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) { err = websocketConn.conn.WriteMessage(websocket.BinaryMessage, p) if err == nil { @@ -37,3 +47,7 @@ func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) { func (websocketConn *WebSocketConn) Close() error { return websocketConn.conn.Close() } + +func (websocketConn *WebSocketConn) RemoteAddr() net.Addr { + return websocketConn.conn.RemoteAddr() +} diff --git a/pkg/iowrappers/sync.go b/pkg/iowrappers/sync.go index 194471a..2f0437f 100644 --- a/pkg/iowrappers/sync.go +++ b/pkg/iowrappers/sync.go @@ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { }() _, err := io.Copy(stream1, stream2) if err != nil { - log.Printf("error %v\n", err) + log.Printf("sync streams error(1) %v\n", err) } }() @@ -23,7 +23,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { waitChannel <- true }() _, err := io.Copy(stream2, stream1) - log.Printf("Error %v\n", err) + log.Printf("sync streams error(2) %v\n", err) }() <-waitChannel