From 8981efd0b5a7075e97e887f47a0f9e6f5f2a3119 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sat, 20 Jul 2024 21:55:34 +0200 Subject: [PATCH] lots of restructuring. Experimensts with websockets over yamux failed. Now going to use a second connection to the server from the agent. --- cmd/agent/agent.go | 10 ++--- cmd/converge/server.go | 6 +-- cmd/tcptows/tcptows.go | 3 +- cmd/wstotcp/wstotcp.go | 3 +- pkg/converge/admin.go | 7 ++- pkg/iowrappers/io.go | 41 ----------------- pkg/websocketutil/connections.go | 77 ++++++++++++++++++++++++++++++++ pkg/websocketutil/listener.go | 34 ++++++++++++++ pkg/websocketutil/services.go | 17 +++++-- 9 files changed, 142 insertions(+), 56 deletions(-) create mode 100644 pkg/websocketutil/connections.go create mode 100644 pkg/websocketutil/listener.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index dbb2fa6..3df9cf5 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -3,6 +3,7 @@ package main import ( "bufio" "cidebug/pkg/iowrappers" + "cidebug/pkg/websocketutil" "crypto/rand" "crypto/rsa" "crypto/x509" @@ -98,6 +99,7 @@ func sshServer(hostKeyFile string) *ssh.Server { } func echoServer(conn io.ReadWriter) { + log.Println("Echo service started") io.Copy(conn, conn) } @@ -168,7 +170,7 @@ func main() { log.Println("WebSocket connection error:", err) return } - wsConn := iowrappers.NewWebSocketConn(conn) + wsConn := websocketutil.NewWebSocketConn(conn) defer wsConn.Close() listener, err := yamux.Server(wsConn, nil) @@ -176,14 +178,12 @@ func main() { panic(err) } - log.Println("Connection established to rendez-vous server, waiting for debug sessions") - + // Need to create listener implementation that aactually listens for websocket connections. var service AgentService service = ListenerServer(func() *ssh.Server { return sshServer("hostkey.pem") }) + //service = ConnectionServer(netCatServer) //service = ConnectionServer(echoServer) - //service := ConnectionServer(netCatServer) service.Run(listener) - } diff --git a/cmd/converge/server.go b/cmd/converge/server.go index ed0524a..5f4cfcd 100644 --- a/cmd/converge/server.go +++ b/cmd/converge/server.go @@ -2,10 +2,10 @@ package main import ( "cidebug/pkg/converge" - "cidebug/pkg/iowrappers" "cidebug/pkg/websocketutil" "fmt" "log" + "net" "net/http" "regexp" ) @@ -23,7 +23,7 @@ func main() { admin := converge.NewAdmin() registrationService := websocketutil.WebSocketService{ - Handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) { + Handler: func(w http.ResponseWriter, r *http.Request, conn net.Conn) { publicId, err := parsePublicId(r.URL.Path) if err != nil { log.Printf("Cannot parse public id from url: '%v'\n", err) @@ -37,7 +37,7 @@ func main() { }, } clientService := websocketutil.WebSocketService{ - Handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) { + Handler: func(w http.ResponseWriter, r *http.Request, conn net.Conn) { publicId, err := parsePublicId(r.URL.Path) if err != nil { log.Printf("Cannot parse public id from url: '%v'\n", err) diff --git a/cmd/tcptows/tcptows.go b/cmd/tcptows/tcptows.go index 68511e0..c0a72c8 100644 --- a/cmd/tcptows/tcptows.go +++ b/cmd/tcptows/tcptows.go @@ -2,6 +2,7 @@ package main import ( "cidebug/pkg/iowrappers" + "cidebug/pkg/websocketutil" "github.com/gorilla/websocket" "log" "net" @@ -24,7 +25,7 @@ func handleConnection(conn net.Conn, wsURL string) { log.Println("WebSocket connection error:", err) return } - wsConn := iowrappers.NewWebSocketConn(_wsConn) + wsConn := websocketutil.NewWebSocketConn(_wsConn) defer wsConn.Close() iowrappers.SynchronizeStreams(wsConn, conn) diff --git a/cmd/wstotcp/wstotcp.go b/cmd/wstotcp/wstotcp.go index 7c81b79..c22414f 100644 --- a/cmd/wstotcp/wstotcp.go +++ b/cmd/wstotcp/wstotcp.go @@ -2,6 +2,7 @@ package main import ( "cidebug/pkg/iowrappers" + "cidebug/pkg/websocketutil" "fmt" "github.com/gorilla/websocket" "log" @@ -34,7 +35,7 @@ func main() { func handleWebSocket(w http.ResponseWriter, r *http.Request, tcpConn net.Conn) { conn, err := upgrader.Upgrade(w, r, nil) - wsConn := iowrappers.NewWebSocketConn(conn) + wsConn := websocketutil.NewWebSocketConn(conn) if err != nil { log.Println("Error upgrading to WebSocket:", err) return diff --git a/pkg/converge/admin.go b/pkg/converge/admin.go index a4241ec..b53d9cf 100644 --- a/pkg/converge/admin.go +++ b/pkg/converge/admin.go @@ -34,7 +34,8 @@ func NewAgent(publicId string, } } -func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, agentConn net.Conn) *Client { +func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, + agentConn net.Conn) *Client { return &Client{ publicId: publicId, agent: agentConn, @@ -105,10 +106,13 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAd // we should setup on-demend connections ot agents later. return nil, fmt.Errorf("No agent found for publicId '%s'", publicId) } + agentConn, err := agent.clientSession.Open() if err != nil { return nil, err } + log.Println("Successful websocket connection to agent") + client := NewClient(publicId, clientConn, agentConn) admin.clients = append(admin.clients, client) admin.logStatus() @@ -180,6 +184,7 @@ func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser admin.RemoveClient(client) }() log.Printf("Connecting client and agent: '%s'\n", publicId) + iowrappers.SynchronizeStreams(client.client, client.agent) return nil } diff --git a/pkg/iowrappers/io.go b/pkg/iowrappers/io.go index 5cbcc72..a215521 100644 --- a/pkg/iowrappers/io.go +++ b/pkg/iowrappers/io.go @@ -1,53 +1,12 @@ package iowrappers import ( - "github.com/gorilla/websocket" "io" "net" ) -type WebSocketConn struct { - conn *websocket.Conn - buf []byte -} - type ReadWriteAddrCloser interface { io.ReadWriteCloser RemoteAddr() net.Addr } - -func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) { - if len(websocketConn.buf) == 0 { - _, message, err := websocketConn.conn.ReadMessage() - if err != nil { - return 0, err - } - websocketConn.buf = message - } - - n = copy(p, websocketConn.buf) - websocketConn.buf = websocketConn.buf[n:] - - return n, err -} - -func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn { - return &WebSocketConn{conn: conn} -} - -func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) { - err = websocketConn.conn.WriteMessage(websocket.BinaryMessage, p) - if err == nil { - n = len(p) - } - return n, err -} - -func (websocketConn *WebSocketConn) Close() error { - return websocketConn.conn.Close() -} - -func (websocketConn *WebSocketConn) RemoteAddr() net.Addr { - return websocketConn.conn.RemoteAddr() -} diff --git a/pkg/websocketutil/connections.go b/pkg/websocketutil/connections.go new file mode 100644 index 0000000..ea87213 --- /dev/null +++ b/pkg/websocketutil/connections.go @@ -0,0 +1,77 @@ +package websocketutil + +import ( + "github.com/gorilla/websocket" + "net" + "time" +) + +type WebSocketConn struct { + conn *websocket.Conn + buf []byte +} + +func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) { + if len(websocketConn.buf) == 0 { + _, message, err := websocketConn.conn.ReadMessage() + if err != nil { + return 0, err + } + websocketConn.buf = message + } + + n = copy(p, websocketConn.buf) + websocketConn.buf = websocketConn.buf[n:] + + return n, err +} + +func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn { + return &WebSocketConn{conn: conn} +} + +func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) { + err = websocketConn.conn.WriteMessage(websocket.BinaryMessage, p) + if err == nil { + n = len(p) + } + return n, err +} + +func (websocketConn *WebSocketConn) Close() error { + return websocketConn.conn.Close() +} + +func (WebSocketConn *WebSocketConn) LocalAddr() net.Addr { + return WebSocketConn.conn.LocalAddr() +} + +func (websocketConn *WebSocketConn) RemoteAddr() net.Addr { + return websocketConn.conn.RemoteAddr() +} + +func (websocketConn *WebSocketConn) SetDeadline(t time.Time) error { + return nil +} + +func (websocketConn *WebSocketConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (websocketConn *WebSocketConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func ConnectWebSocket(conn net.Conn, urlStr string) (net.Conn, error) { + dialer := *websocket.DefaultDialer + dialer.NetDial = func(network, addr string) (net.Conn, error) { + return conn, nil + } + + wsConn, _, err := dialer.Dial(urlStr, nil) + if err != nil { + return nil, err + } + + return NewWebSocketConn(wsConn), nil +} diff --git a/pkg/websocketutil/listener.go b/pkg/websocketutil/listener.go new file mode 100644 index 0000000..ef82ea7 --- /dev/null +++ b/pkg/websocketutil/listener.go @@ -0,0 +1,34 @@ +package websocketutil + +import ( + "log" + "net" +) + +type WebSocketListener struct { + connections chan net.Conn +} + +func NewWebSocketListener() WebSocketListener { + return WebSocketListener{ + connections: make(chan net.Conn), + } +} + +func (listener WebSocketListener) Accept() (net.Conn, error) { + conn := <-listener.connections + log.Printf("Got client connection: %v\n", conn) + return conn, nil +} + +func (listener *WebSocketListener) NewConnection(conn net.Conn) { + listener.connections <- conn +} + +func (listener WebSocketListener) Close() error { + return nil +} + +func (listener WebSocketListener) Addr() net.Addr { + return WebSocketAddr("rendez-vous") +} diff --git a/pkg/websocketutil/services.go b/pkg/websocketutil/services.go index 3fe5162..37efbd5 100644 --- a/pkg/websocketutil/services.go +++ b/pkg/websocketutil/services.go @@ -1,32 +1,41 @@ package websocketutil import ( - "cidebug/pkg/iowrappers" "github.com/gorilla/websocket" "log" + "net" "net/http" ) +type WebSocketAddr string + +func (r WebSocketAddr) Network() string { + return "websocket" +} +func (r WebSocketAddr) String() string { + return string(r) +} + var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } func handleWebSocket(w http.ResponseWriter, r *http.Request, - handler func(w http.ResponseWriter, r *http.Request, websockerConnection iowrappers.ReadWriteAddrCloser)) { + handler func(w http.ResponseWriter, r *http.Request, websockerConnection net.Conn)) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("Error upgrading to WebSocket:", err) return } - wsConn := iowrappers.NewWebSocketConn(conn) + wsConn := NewWebSocketConn(conn) defer wsConn.Close() handler(w, r, wsConn) } type WebSocketService struct { - Handler func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) + Handler func(w http.ResponseWriter, r *http.Request, conn net.Conn) } func (endpoint *WebSocketService) Handle(w http.ResponseWriter, r *http.Request) {