From 882f97fa17d26e21d559a1c9f7c8684b211b0d68 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Wed, 31 Jul 2024 19:30:38 +0200 Subject: [PATCH] many small changes * removed the Async utility * now using Ping message to webclient for keep alive instaed of actual content * added remote shell to AgentInfo * retry of connections to the agent * better logging for SynchronizeStreams --- cmd/agent/agent.go | 31 +++++++++++------------- cmd/tcptows/tcptows.go | 2 +- cmd/wsproxy/wsproxy.go | 2 +- cmd/wstotcp/wstotcp.go | 2 +- pkg/agent/session/session.go | 13 ++++++---- pkg/comms/events.go | 4 ++- pkg/server/converge/admin.go | 17 +++++++++++-- pkg/server/converge/websessions.go | 8 +++--- pkg/server/templates/sessions.templ | 2 +- pkg/server/templates/usage.templ | 9 +++---- pkg/support/async/async.go | 18 -------------- pkg/support/iowrappers/sync.go | 8 +++--- pkg/support/websocketutil/connections.go | 5 +++- 13 files changed, 61 insertions(+), 60 deletions(-) delete mode 100644 pkg/support/async/async.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 0401383..dcb28b0 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -74,7 +74,7 @@ func sshServer(hostKeyFile string, shellCommand string, s.LocalAddr().String(), "ssh", ) session.Login(sessionInfo, s) - iowrappers.SynchronizeStreams(process.Pipe(), s) + iowrappers.SynchronizeStreams("shell -- ssh", process.Pipe(), s) session.LogOut(sessionInfo.ClientId) // will cause addition goroutines to remmain alive when the SSH // session is killed. For now acceptable since the agent is a short-lived @@ -108,7 +108,7 @@ func netCatServer(conn io.ReadWriter) { stdio := bufio.NewReadWriter( bufio.NewReaderSize(os.Stdin, 0), bufio.NewWriterSize(os.Stdout, 0)) - iowrappers.SynchronizeStreams(conn, stdio) + iowrappers.SynchronizeStreams("stdio -- ws", conn, stdio) } type AgentService interface { @@ -166,15 +166,15 @@ func printHelp(msg string) { "Here is the unique id of the agent that allows rendez-vous with an end-user.\n" + "The end-user must specify the same id when connecting using ssh.\n" + "\n" + - "--id: rendez-vous id. When specified an SSH authorized key must be used and password\n" + - " based access is disabled. When not specified a random id is chosen by the agent and\n" + - " password based access is possible. The password is configured on the converge server\n" + - "--ssh-keys-file: SSH authorized keys file in openssh format. By default .authorized_keys in the\n" + - " directory where the agent is started is used.\n" + - "--warning-time: advance warning time before sessio ends\n" + - "--expiry-time: expiry time of the session\n" + - "--check-interval: interval at which expiry is checked\n" + - "-insecure: allow invalid certificates\n" + "--id: rendez-vous id. When specified an SSH authorized key must be used and password\n" + + " based access is disabled. When not specified a random id is chosen by the agent and\n" + + " password based access is possible. The password is configured on the converge server\n" + + "--authorized-keys: SSH authorized keys file in openssh format. By default .authorized_keys in the\n" + + " directory where the agent is started is used.\n" + + "--warning-time: advance warning time before sessio ends\n" + + "--expiry-time: expiry time of the session\n" + + "--check-interval: interval at which expiry is checked\n" + + "-insecure: allow invalid certificates\n" fmt.Fprintln(os.Stderr, helpText) os.Exit(1) @@ -218,7 +218,7 @@ func main() { switch args[0] { case "--id": id, args = getArg(args) - case "--ssh-keys-file": + case "--authorized-keys": authorizedKeysFile, args = getArg(args) case "--warning-time": advanceWarningTime, args = parseDuration(args, val) @@ -266,7 +266,8 @@ func main() { wsConn := websocketutil.NewWebSocketConn(conn, false) defer wsConn.Close() - serverInfo, err := comms.AgentInitialization(wsConn, comms.NewAgentInfo()) + shell := chooseShell() + serverInfo, err := comms.AgentInitialization(wsConn, comms.NewAgentInfo(shell)) if err != nil { log.Printf("ERROR: %v", err) os.Exit(1) @@ -300,10 +301,6 @@ func main() { serverInfo.UserPassword, authorizedKeysFile) - // Choose shell - - shell := chooseShell() - var service AgentService service = ListenerServer(func() *ssh.Server { diff --git a/cmd/tcptows/tcptows.go b/cmd/tcptows/tcptows.go index 9d95e35..24ad3ad 100644 --- a/cmd/tcptows/tcptows.go +++ b/cmd/tcptows/tcptows.go @@ -39,7 +39,7 @@ func handleConnection(conn net.Conn, wsURL string, insecure bool) { wsConn := websocketutil.NewWebSocketConn(_wsConn, false) defer wsConn.Close() - iowrappers.SynchronizeStreams(wsConn, conn) + iowrappers.SynchronizeStreams(wsURL+" -- localport", wsConn, conn) } func main() { diff --git a/cmd/wsproxy/wsproxy.go b/cmd/wsproxy/wsproxy.go index e574ed0..8ad123d 100644 --- a/cmd/wsproxy/wsproxy.go +++ b/cmd/wsproxy/wsproxy.go @@ -73,5 +73,5 @@ func main() { wsConn := websocketutil.NewWebSocketConn(_wsConn, false) defer wsConn.Close() - iowrappers.SynchronizeStreams(wsConn, Stdio{}) + iowrappers.SynchronizeStreams(wsURL+" -- stdio", wsConn, Stdio{}) } diff --git a/cmd/wstotcp/wstotcp.go b/cmd/wstotcp/wstotcp.go index b586f7f..b6c6774 100644 --- a/cmd/wstotcp/wstotcp.go +++ b/cmd/wstotcp/wstotcp.go @@ -42,5 +42,5 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request, tcpConn net.Conn) { } defer wsConn.Close() - iowrappers.SynchronizeStreams(tcpConn, wsConn) + iowrappers.SynchronizeStreams("tcp -- ws", tcpConn, wsConn) } diff --git a/pkg/agent/session/session.go b/pkg/agent/session/session.go index 0bc3582..124c47c 100644 --- a/pkg/agent/session/session.go +++ b/pkg/agent/session/session.go @@ -3,7 +3,6 @@ package session import ( "bytes" "converge/pkg/comms" - "converge/pkg/support/async" "fmt" "github.com/fsnotify/fsnotify" "github.com/gliderlabs/ssh" @@ -107,7 +106,7 @@ func ConfigureAgent(commChannel comms.CommChannel, go func() { for { <-state.ticker.C - events <- async.Async(check) + events <- check } }() go monitorHoldFile() @@ -121,11 +120,15 @@ func ConfigureAgent(commChannel comms.CommChannel, } func Login(sessionInfo comms.SessionInfo, sshSession ssh.Session) { - events <- async.Async(login, sessionInfo, sshSession) + events <- func() { + login(sessionInfo, sshSession) + } } func LogOut(clientId string) { - events <- async.Async(logOut, clientId) + events <- func() { + logOut(clientId) + } } // Internal interface synchronous @@ -148,7 +151,7 @@ func monitorHoldFile() { } base := filepath.Base(event.Name) if base == holdFilename { - events <- async.Async(holdFileChange) + events <- holdFileChange } case err, ok := <-watcher.Errors: diff --git a/pkg/comms/events.go b/pkg/comms/events.go index 28877ef..6f21486 100644 --- a/pkg/comms/events.go +++ b/pkg/comms/events.go @@ -21,6 +21,7 @@ type AgentInfo struct { Hostname string Pwd string OS string + Shell string } type ClientInfo struct { @@ -72,7 +73,7 @@ type ConvergeMessage struct { Value interface{} } -func NewAgentInfo() AgentInfo { +func NewAgentInfo(shell string) AgentInfo { username, _ := user.Current() host, _ := os.Hostname() pwd, _ := os.Getwd() @@ -81,6 +82,7 @@ func NewAgentInfo() AgentInfo { Hostname: host, Pwd: pwd, OS: runtime.GOOS, + Shell: shell, } } diff --git a/pkg/server/converge/admin.go b/pkg/server/converge/admin.go index 0dbf315..25a5bff 100644 --- a/pkg/server/converge/admin.go +++ b/pkg/server/converge/admin.go @@ -189,11 +189,12 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteA return nil, fmt.Errorf("No agent found for PublicId '%s'", publicId) } - agentConn, err := agent.commChannel.Session.Open() + agentConn, err := admin.getAgentConnection(agent) if err != nil { return nil, err } log.Println("Successful websocket connection to agent") + log.Println("Sending connection information to agent") client := NewClient(publicId, clientConn, agentConn) @@ -212,6 +213,18 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteA return client, nil } +func (admin *Admin) getAgentConnection(agent *AgentConnection) (net.Conn, error) { + agentConn, err := agent.commChannel.Session.Open() + count := 0 + for err != nil && count < 10 { + log.Printf("Retrying connection to agent: %v", err) + time.Sleep(250 * time.Millisecond) + count++ + agentConn, err = agent.commChannel.Session.Open() + } + return agentConn, err +} + func (admin *Admin) RemoveAgent(publicId string) error { admin.mutex.Lock() defer admin.mutex.Unlock() @@ -312,6 +325,6 @@ func (admin *Admin) Connect(publicId string, conn iowrappers2.ReadWriteAddrClose }() log.Printf("Connecting client and agent: '%s'\n", publicId) - iowrappers2.SynchronizeStreams(client.client, client.agent) + iowrappers2.SynchronizeStreams("client -- agent", client.client, client.agent) return nil } diff --git a/pkg/server/converge/websessions.go b/pkg/server/converge/websessions.go index 203819e..673bf2f 100644 --- a/pkg/server/converge/websessions.go +++ b/pkg/server/converge/websessions.go @@ -56,6 +56,8 @@ func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession { } func (session *WebSession) WriteNotifications() { + timer := time.NewTicker(10 * time.Second) + defer timer.Stop() for { select { case notification, ok := <-session.notifications: @@ -73,8 +75,8 @@ func (session *WebSession) WriteNotifications() { log.Printf("WS connection closed: %v", err) return } - case <-time.After(10 * time.Second): - _, err := session.conn.Write([]byte("
ping
")) + case <-timer.C: + _, err := session.conn.Write(make([]byte, 0, 0)) if err != nil { log.Printf("WS connection closed: %v", err) return @@ -92,5 +94,5 @@ func (sessions *WebSessions) SessionClosed(session *WebSession) { } func (sessions *WebSessions) logSessions() { - log.Printf("New web session, nsessions %d", len(sessions.sessions)) + log.Printf("Web session count %d", len(sessions.sessions)) } diff --git a/pkg/server/templates/sessions.templ b/pkg/server/templates/sessions.templ index ceac7cd..e743667 100644 --- a/pkg/server/templates/sessions.templ +++ b/pkg/server/templates/sessions.templ @@ -6,7 +6,7 @@ templ Sessions() {

sessions

- Initial content + Loading...
diff --git a/pkg/server/templates/usage.templ b/pkg/server/templates/usage.templ index 789f8d8..ce3616f 100644 --- a/pkg/server/templates/usage.templ +++ b/pkg/server/templates/usage.templ @@ -26,11 +26,10 @@ templ Usage(secure string, host string, username string) { Above, ID is a unique id for the job, the so-called rendez-cous ID. This should not conflict with IDs used by other agents. The ID is used for a rendez-vous between the end-user on a local system and the continuous integration job running on a build agent. If you don't specify an id, a random - id will be generated. - - The agent to the converge server and tells it the ID. Clients can now connect to the Converge - server to establish a connection to the CI job through converge by also specifying the same - ID. + id will be generated. If you specify a duplicate ID, the server will generate a new one andd the + agent will tell you what id to use. + Clients can now connect to the Converge server with the ID to establish a connection to + the CI job through Converge. Communication between end-user and agent is encrypted using SSH and the rendez-vous server is unable to diff --git a/pkg/support/async/async.go b/pkg/support/async/async.go deleted file mode 100644 index 5c0a1db..0000000 --- a/pkg/support/async/async.go +++ /dev/null @@ -1,18 +0,0 @@ -package async - -import "reflect" - -func Async(fn interface{}, args ...interface{}) func() { - fnValue := reflect.ValueOf(fn) - - // Prepare the arguments - params := make([]reflect.Value, len(args)) - for i, arg := range args { - params[i] = reflect.ValueOf(arg) - } - - // Return a function that, when called, will invoke the original function - return func() { - fnValue.Call(params) - } -} diff --git a/pkg/support/iowrappers/sync.go b/pkg/support/iowrappers/sync.go index 7d7667e..8a01a8f 100644 --- a/pkg/support/iowrappers/sync.go +++ b/pkg/support/iowrappers/sync.go @@ -5,7 +5,7 @@ import ( "log" ) -func SynchronizeStreams(stream1, stream2 io.ReadWriter) { +func SynchronizeStreams(description string, stream1, stream2 io.ReadWriter) { waitChannel := make(chan bool, 2) go func() { @@ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { }() _, err := io.Copy(stream1, stream2) if err != nil { - log.Printf("SynchronizeStreamms: error(1) %v\n", err) + log.Printf("SynchronizeStreams: %s: error <-: %v\n", description, err) } }() @@ -24,10 +24,10 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { }() _, err := io.Copy(stream2, stream1) if err != nil { - log.Printf("SynchronizeStreams: error(2) %v\n", err) + log.Printf("SynchronizeStreams: %s: error ->: %v\n", description, err) } }() <-waitChannel - log.Println("SynchronizeStreams: Connection closed") + log.Printf("SynchronizeStreams: %s: Connection closed", description) } diff --git a/pkg/support/websocketutil/connections.go b/pkg/support/websocketutil/connections.go index 0ac63f3..231c507 100644 --- a/pkg/support/websocketutil/connections.go +++ b/pkg/support/websocketutil/connections.go @@ -33,7 +33,10 @@ func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) { func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) { messageType := websocket.BinaryMessage - if websocketConn.text { + switch { + case len(p) == 0: + messageType = websocket.PingMessage + case websocketConn.text: messageType = websocket.TextMessage } err = websocketConn.conn.WriteMessage(messageType, p)