From 41403476c6a631619def18a8a8a4e3957f9942da Mon Sep 17 00:00:00 2001
From: Erik Brakkee <erik@brakkee.org>
Date: Sat, 20 Jul 2024 13:35:49 +0200
Subject: [PATCH] working server * administration appears coorect * multiple
 clients for one agent * logging of active connections * simple echo server on
 the agent.

---
 cmd/agent/agent.go     |  29 +++++++-
 cmd/converge/server.go | 160 ++++++++++++++++++++++++++---------------
 go.mod                 |   1 +
 go.sum                 |   2 +
 pkg/iowrappers/io.go   |  20 +++++-
 pkg/iowrappers/sync.go |   4 +-
 6 files changed, 150 insertions(+), 66 deletions(-)

diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go
index 07c650b..ba81480 100755
--- a/cmd/agent/agent.go
+++ b/cmd/agent/agent.go
@@ -14,6 +14,7 @@ import (
 
 	"github.com/creack/pty"
 	"github.com/gliderlabs/ssh"
+	"github.com/hashicorp/yamux"
 	"github.com/pkg/sftp"
 )
 
@@ -113,7 +114,31 @@ func main() {
 	wsConn := iowrappers.NewWebSocketConn(conn)
 	defer wsConn.Close()
 
-	// echo server
-	iowrappers.SynchronizeStreams(wsConn, wsConn)
+	listener, err := yamux.Client(wsConn, nil)
+	if err != nil {
+		panic(err)
+	}
+
+	log.Println("Connection established to rendez-vous server, waiting for debug sessions")
+
+	// Session is a listener
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			panic(err)
+		}
+		go handleConnection(conn)
+	}
 
 }
+
+func handleConnection(conn io.ReadWriter) {
+	//stdio := bufio.NewReadWriter(
+	//	bufio.NewReaderSize(os.Stdin, 0),
+	//	bufio.NewWriterSize(os.Stdout, 0))
+
+	// echo server
+	io.Copy(conn, conn)
+
+	//iowrappers.SynchronizeStreams(conn, stdio)
+}
diff --git a/cmd/converge/server.go b/cmd/converge/server.go
index 2316cde..14b0d5a 100644
--- a/cmd/converge/server.go
+++ b/cmd/converge/server.go
@@ -4,10 +4,14 @@ import (
 	"cidebug/pkg/iowrappers"
 	"fmt"
 	"github.com/gorilla/websocket"
+	"github.com/hashicorp/yamux"
+	"io"
 	"log"
+	"net"
 	"net/http"
 	"regexp"
 	"sync"
+	"time"
 )
 
 var upgrader = websocket.Upgrader{
@@ -16,7 +20,7 @@ var upgrader = websocket.Upgrader{
 }
 
 func handleWebSocket(w http.ResponseWriter, r *http.Request,
-	handler func(w http.ResponseWriter, r *http.Request, websockerConnection *iowrappers.WebSocketConn)) {
+	handler func(w http.ResponseWriter, r *http.Request, websockerConnection iowrappers.ReadWriteAddrCloser)) {
 	conn, err := upgrader.Upgrade(w, r, nil)
 	if err != nil {
 		log.Println("Error upgrading to WebSocket:", err)
@@ -29,7 +33,7 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request,
 }
 
 type WebSocketService struct {
-	handler func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn)
+	handler func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser)
 }
 
 func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request) {
@@ -37,72 +41,106 @@ func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request)
 }
 
 type Agent struct {
-	agent          *iowrappers.WebSocketConn
-	publicId       string
-	client         *iowrappers.WebSocketConn
-	agentAvailable chan bool
-	clientClosed   chan bool
+	// server session
+	clientSession *yamux.Session
+	publicId      string
+	startTime     time.Time
+}
+
+type Client struct {
+	publicId  string
+	agent     net.Conn
+	client    iowrappers.ReadWriteAddrCloser
+	startTime time.Time
 }
 
 func NewAgent(publicId string,
-	agentConn *iowrappers.WebSocketConn,
-	clientConn *iowrappers.WebSocketConn) *Agent {
+	agentSession *yamux.Session) *Agent {
 	return &Agent{
-		agent:          agentConn,
-		publicId:       publicId,
-		client:         clientConn,
-		agentAvailable: make(chan bool, 1),
-		clientClosed:   make(chan bool, 1),
+		clientSession: agentSession,
+		publicId:      publicId,
+		startTime:     time.Now(),
+	}
+}
+
+func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, agentConn net.Conn) *Client {
+	return &Client{
+		publicId:  publicId,
+		agent:     agentConn,
+		client:    clientConn,
+		startTime: time.Now(),
 	}
 }
 
 type Admin struct {
 	// map of public id to agent
-	mutex  sync.Mutex
-	agents map[string]*Agent
+	mutex   sync.Mutex
+	agents  map[string]*Agent
+	clients []*Client
 }
 
 func NewAdmin() *Admin {
 	admin := Admin{
-		mutex:  sync.Mutex{},
-		agents: make(map[string]*Agent),
+		mutex:   sync.Mutex{},
+		agents:  make(map[string]*Agent),
+		clients: make([]*Client, 0), // not strictly needed
 	}
 	return &admin
 }
 
-func (admin *Admin) addAgent(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) {
+func (admin *Admin) logStatus() {
+	log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS")
+	for _, agent := range admin.agents {
+		agent.clientSession.RemoteAddr()
+		log.Printf("%-20s %-20s %-20s\n", agent.publicId,
+			agent.startTime.Format("2006-01-02 15:04:05"),
+			agent.clientSession.RemoteAddr().String())
+	}
+	log.Println("")
+	log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS")
+	for _, client := range admin.clients {
+		log.Printf("%-20s %-20s %-20s", client.publicId,
+			client.startTime.Format("2006-01-02 15:04:05"),
+			client.client.RemoteAddr())
+	}
+	log.Printf("\n")
+}
+
+func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent, error) {
 	admin.mutex.Lock()
 	defer admin.mutex.Unlock()
 
 	agent := admin.agents[publicId]
-	if agent != nil && agent.agent != nil && agent.agent != conn {
+	if agent != nil {
 		return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId)
 	}
-	if agent != nil {
-		agent.agent = conn
-		return agent, nil
+	session, err := yamux.Client(conn, nil)
+	if err != nil {
+		return nil, err
 	}
-	agent = NewAgent(publicId, conn, nil)
+	agent = NewAgent(publicId, session)
 	admin.agents[publicId] = agent
+	admin.logStatus()
 	return agent, nil
 }
 
-func (admin *Admin) addClient(publicId string, conn *iowrappers.WebSocketConn) (*Agent, error) {
+func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser) (*Client, error) {
 	admin.mutex.Lock()
 	defer admin.mutex.Unlock()
 
 	agent := admin.agents[publicId]
-	if agent != nil && agent.client != nil && agent.client != conn {
+	if agent == nil {
 		// we should setup on-demend connections ot agents later.
-		return nil, fmt.Errorf("A different client with same publicId '%s' already registered", publicId)
+		return nil, fmt.Errorf("No agent found for publicId '%s'", publicId)
 	}
-	if agent != nil {
-		agent.client = conn
-		return agent, nil
+	agentConn, err := agent.clientSession.Open()
+	if err != nil {
+		return nil, err
 	}
-	agent = NewAgent(publicId, nil, conn)
-	admin.agents[publicId] = agent
-	return agent, nil
+	client := NewClient(publicId, clientConn, agentConn)
+	admin.clients = append(admin.clients, client)
+	admin.logStatus()
+	return client, nil
 }
 
 func (admin *Admin) RemoveAgent(publicId string) error {
@@ -113,32 +151,39 @@ func (admin *Admin) RemoveAgent(publicId string) error {
 	if agent == nil {
 		return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
 	}
-	agent.agent = nil
-	if agent.client == nil {
-		log.Printf("Removing agent: '%s'", publicId)
-		delete(admin.agents, publicId)
+	log.Printf("Removing agent: '%s'", publicId)
+	err := agent.clientSession.Close()
+	if err != nil {
+		log.Printf("Could not close yamux client session for '%s'\n", publicId)
 	}
+	delete(admin.agents, publicId)
+	admin.logStatus()
 	return nil
 }
 
-func (admin *Admin) RemoveClient(publicId string) error {
+func (admin *Admin) RemoveClient(client *Client) error {
 	admin.mutex.Lock()
 	defer admin.mutex.Unlock()
 
-	agent := admin.agents[publicId]
-	if agent == nil {
-		return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
-	}
-	agent.client = nil
-	if agent.agent == nil {
-		log.Printf("Removing client: '%s'", publicId)
-		delete(admin.agents, publicId)
+	log.Printf("Removing client: '%s' created at %s\n", client.publicId,
+		client.startTime.Format("2006-01-02 15:04:05"))
+	// try to explicitly close connection to the agent.
+	_ = client.agent.Close()
+	_ = client.client.Close()
+
+	for i, _client := range admin.clients {
+		if _client == _client {
+			admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
+			break
+		}
 	}
+	admin.logStatus()
 	return nil
 }
 
-func (admin *Admin) Register(publicId string, conn *iowrappers.WebSocketConn) error {
+func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error {
 	defer conn.Close()
+	// TODO: remove agent return value
 	agent, err := admin.addAgent(publicId, conn)
 	if err != nil {
 		return err
@@ -146,26 +191,24 @@ func (admin *Admin) Register(publicId string, conn *iowrappers.WebSocketConn) er
 	defer func() {
 		admin.RemoveAgent(publicId)
 	}()
-	log.Printf("After defer remove agent\n")
-	agent.agentAvailable <- true
 	log.Printf("Agent registered: '%s'\n", publicId)
-	<-agent.clientClosed
+	for !agent.clientSession.IsClosed() {
+		time.Sleep(1 * time.Second)
+	}
 	return nil
 }
 
-func (admin *Admin) Connect(publicId string, conn *iowrappers.WebSocketConn) error {
+func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser) error {
 	defer conn.Close()
-	agent, err := admin.addClient(publicId, conn)
+	client, err := admin.addClient(publicId, conn)
 	if err != nil {
 		return err
 	}
 	defer func() {
-		admin.RemoveClient(publicId)
+		admin.RemoveClient(client)
 	}()
-	<-agent.agentAvailable
 	log.Printf("Connecting client and agent: '%s'\n", publicId)
-	iowrappers.SynchronizeStreams(agent.client, agent.agent)
-	agent.clientClosed <- true
+	iowrappers.SynchronizeStreams(client.client, client.agent)
 	return nil
 }
 
@@ -184,14 +227,13 @@ func parsePublicId(path string) (publicId string, _ error) {
 		return "", fmt.Errorf("Invalid URL path '%s'", path)
 	}
 	return matches[1], nil
-
 }
 
 func main() {
 
 	admin := NewAdmin()
 	registrationService := WebSocketService{
-		handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) {
+		handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) {
 			publicId, err := parsePublicId(r.URL.Path)
 			if err != nil {
 				log.Printf("Cannot parse public id from url: '%v'\n", err)
@@ -205,7 +247,7 @@ func main() {
 		},
 	}
 	clientService := WebSocketService{
-		handler: func(w http.ResponseWriter, r *http.Request, conn *iowrappers.WebSocketConn) {
+		handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) {
 			publicId, err := parsePublicId(r.URL.Path)
 			if err != nil {
 				log.Printf("Cannot parse public id from url: '%v'\n", err)
diff --git a/go.mod b/go.mod
index 2f22ceb..40ce1fe 100755
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
 	github.com/creack/pty v1.1.21
 	github.com/gliderlabs/ssh v0.3.7
 	github.com/gorilla/websocket v1.5.3
+	github.com/hashicorp/yamux v0.1.1
 	github.com/pkg/sftp v1.13.6
 	golang.org/x/crypto v0.25.0
 	golang.org/x/term v0.22.0
diff --git a/go.sum b/go.sum
index 4762ea3..b17c2df 100755
--- a/go.sum
+++ b/go.sum
@@ -9,6 +9,8 @@ github.com/gliderlabs/ssh v0.3.7 h1:iV3Bqi942d9huXnzEF2Mt+CY9gLu8DNM4Obd+8bODRE=
 github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8=
 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
+github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
 github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
 github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
 github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
diff --git a/pkg/iowrappers/io.go b/pkg/iowrappers/io.go
index 68ca433..5cbcc72 100644
--- a/pkg/iowrappers/io.go
+++ b/pkg/iowrappers/io.go
@@ -1,14 +1,20 @@
 package iowrappers
 
-import "github.com/gorilla/websocket"
+import (
+	"github.com/gorilla/websocket"
+	"io"
+	"net"
+)
 
 type WebSocketConn struct {
 	conn *websocket.Conn
 	buf  []byte
 }
 
-func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn {
-	return &WebSocketConn{conn: conn}
+type ReadWriteAddrCloser interface {
+	io.ReadWriteCloser
+
+	RemoteAddr() net.Addr
 }
 
 func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) {
@@ -26,6 +32,10 @@ func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) {
 	return n, err
 }
 
+func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn {
+	return &WebSocketConn{conn: conn}
+}
+
 func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) {
 	err = websocketConn.conn.WriteMessage(websocket.BinaryMessage, p)
 	if err == nil {
@@ -37,3 +47,7 @@ func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) {
 func (websocketConn *WebSocketConn) Close() error {
 	return websocketConn.conn.Close()
 }
+
+func (websocketConn *WebSocketConn) RemoteAddr() net.Addr {
+	return websocketConn.conn.RemoteAddr()
+}
diff --git a/pkg/iowrappers/sync.go b/pkg/iowrappers/sync.go
index 194471a..2f0437f 100644
--- a/pkg/iowrappers/sync.go
+++ b/pkg/iowrappers/sync.go
@@ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
 		}()
 		_, err := io.Copy(stream1, stream2)
 		if err != nil {
-			log.Printf("error %v\n", err)
+			log.Printf("sync streams error(1) %v\n", err)
 		}
 	}()
 
@@ -23,7 +23,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
 			waitChannel <- true
 		}()
 		_, err := io.Copy(stream2, stream1)
-		log.Printf("Error %v\n", err)
+		log.Printf("sync streams error(2) %v\n", err)
 	}()
 
 	<-waitChannel