From 2ed81c3174d0489519f5336fee3e6f7b50104ef3 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Thu, 25 Jul 2024 19:51:11 +0200 Subject: [PATCH] communication between agent and server. Removed the flags libray for command-line parsing. Heartbeat mechanism from client to server over the custom connection for sending events to guarantee that the connectoin stays up. --- cmd/agent/agent.go | 135 +++++++++++++++++++++++++++++++-------- cmd/converge/server.go | 65 ++++++++++++------- cmd/tcptows/tcptows.go | 26 ++++---- cmd/wsproxy/wsproxy.go | 25 ++++---- pkg/agent/session.go | 14 ++-- pkg/comms/agentserver.go | 102 ++++++++++++++++++----------- pkg/comms/events.go | 78 ++++++++++++++++++++++ pkg/comms/serverapi.go | 20 ------ pkg/converge/admin.go | 95 +++++++++++++++++---------- static/index.html | 21 ++++-- 10 files changed, 404 insertions(+), 177 deletions(-) create mode 100644 pkg/comms/events.go delete mode 100644 pkg/comms/serverapi.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 2261b0e..9a9ad7d 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -8,19 +8,21 @@ import ( "converge/pkg/terminal" "converge/pkg/websocketutil" "crypto/tls" - "flag" "fmt" "github.com/gliderlabs/ssh" "github.com/gorilla/websocket" "github.com/pkg/sftp" "io" "log" + "math/rand" "net" "net/http" "net/url" "os" "os/exec" + "regexp" "runtime" + "strconv" "strings" "time" @@ -55,9 +57,11 @@ func SftpHandler(sess ssh.Session) { } } +var sshUserCredentials = comms.UserPassword{} + func passwordAuth(ctx ssh.Context, password string) bool { // Replace with your own logic to validate username and password - return ctx.User() == "abc" && password == "123" + return ctx.User() == sshUserCredentials.Username && password == sshUserCredentials.Password } func sshServer(hostKeyFile string, shellCommand string) *ssh.Server { @@ -135,35 +139,109 @@ func (f ReaderFunc) Read(p []byte) (n int, err error) { return f(p) } +func validateString(value, description, pattern string) { + matched, err := regexp.MatchString(pattern, value) + if err != nil || !matched { + printHelp(fmt.Sprintf("%s: wrong value '%s', must conform to pattern '%s'", + description, value, pattern)) + } +} + +func getId(id string) string { + if id == "" { + // not specified + return strconv.Itoa(time.Now().Nanosecond() % 1000000000) + } + validateString(id, "id", `^[a-zA-Z0-9-]+$`) + return id +} + +func printHelp(msg string) { + if msg != "" { + fmt.Fprintf(os.Stderr, "ERROR: %s\n\n", msg) + } + helpText := "agent [options] \n" + + "\n" + + "Run agent with of the form ws[s]://[:port]\n" + + "Here is the unique id of the agent that allows rendez-vous with an end-user.\n" + + "The end-user must specify the same id when connecting using ssh.\n" + + "\n" + + "--id: Rendez-vous id. When specified an SSH authorized key must be used and password\n" + + " based access is disabled. When not specified a random id is chosen by the agent and\n" + + " password based access is possible. The password is configured on the converge server\n" + + "--warning-time: advance warning time before sessio ends\n" + + "--expiry-time: expiry time of the session\n" + + "--check-interval: interval at which expiry is checked\n" + + "-insecure: allow invalid certificates\n" + + fmt.Fprintln(os.Stderr, helpText) + os.Exit(1) +} + +func getArg(args []string) (value string, ret []string) { + if len(args) < 2 { + printHelp(fmt.Sprintf("The '%s' option expects an argument", args[0])) + } + return args[1], args[1:] +} + +func parseDuration(args []string, val string) (time.Duration, []string) { + duration, err := time.ParseDuration(val) + if err != nil { + printHelp(fmt.Sprintf("Error parsing duration: %v\n", err)) + } + return duration, args[1:] +} + func main() { - usage := "agent [options] \n" + - "\n" + - "Run agent with of the form ws[s]://[:port]/agent/\n" + - "Here is the unique id of the agent that allows rendez-vous with an end-user.\n" + - "The end-user must specify the same id when connecting using ssh.\n" - - flag.Usage = func() { - fmt.Fprintf(os.Stderr, usage+"\n") - flag.PrintDefaults() + // Random user name and password so that effectively no one can login + // until the user and password have been received from the server. + sshUserCredentials = comms.UserPassword{ + Username: strconv.Itoa(rand.Int()), + Password: strconv.Itoa(rand.Int()), } - advanceWarningTime := flag.Duration("warning-time", 5*time.Minute, "advance warning time before sessio ends") - agentExpriryTime := flag.Duration("expiry-time", 10*time.Minute, "expiry time of the session") - tickerInterval := flag.Duration("check-interval", 60*time.Second, "interval at which expiry is checked") - insecure := flag.Bool("insecure", false, "allow invalid certificates") - flag.Parse() - if flag.NArg() != 1 { - flag.Usage() - os.Exit(1) + id := "" + advanceWarningTime := 5 * time.Minute + agentExpriryTime := 10 * time.Minute + tickerInterval := 60 * time.Second + insecure := false + + args := os.Args[1:] + for len(args) > 0 && strings.HasPrefix(args[0], "-") { + val := "" + switch args[0] { + case "--id": + id, args = getArg(args) + case "--warning-time": + advanceWarningTime, args = parseDuration(args, val) + case "--expiry-time": + agentExpriryTime, args = parseDuration(args, val) + case "--check-interval": + tickerInterval, args = parseDuration(args, val) + case "--insecure": + insecure = true + default: + printHelp("Unknown option " + args[0]) + } + args = args[1:] } - wsURL := flag.Arg(0) + + id = getId(id) + + if len(args) != 1 { + printHelp("") + } + validateString(args[0], "wsUrl", `^wss?://[a-zA-Z0-9]*(:[0-9]+)?$`) + + wsURL := args[0] + "/agent/" + id dialer := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: 45 * time.Second, } - if *insecure { + if insecure { dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } conn, _, err := dialer.Dial(wsURL, nil) @@ -179,6 +257,11 @@ func main() { panic(err) } + go comms.ListenForServerEvents(commChannel, func(user comms.UserPassword) { + log.Println("Username and password configuration received from server") + sshUserCredentials = user + }) + var service AgentService shells := []string{"bash", "sh", "ash", "ksh", "zsh", "fish", "tcsh", "csh"} if runtime.GOOS == "windows" { @@ -206,10 +289,10 @@ func main() { log.Printf("Clients should use the following commands to connect to this agent:") log.Println() clientUrl := strings.ReplaceAll(wsURL, "/agent/", "/client/") - sshCommand := fmt.Sprintf("ssh -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" abc@localhost", - clientUrl) - sftpCommand := fmt.Sprintf("sftp -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" abc@localhost", - clientUrl) + sshCommand := fmt.Sprintf("ssh -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" %s@localhost", + clientUrl, sshUserCredentials.Username) + sftpCommand := fmt.Sprintf("sftp -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" %s@localhost", + clientUrl, sshUserCredentials.Username) log.Println(" # For SSH") log.Println(" " + sshCommand) log.Println() @@ -223,6 +306,6 @@ func main() { "://"+urlObject.Host+"/docs/wsproxy") log.Println() - agent.ConfigureAgent(commChannel, *advanceWarningTime, *agentExpriryTime, *tickerInterval) + agent.ConfigureAgent(commChannel, advanceWarningTime, agentExpriryTime, tickerInterval) service.Run(commChannel.Session) } diff --git a/cmd/converge/server.go b/cmd/converge/server.go index 1014698..96138e1 100644 --- a/cmd/converge/server.go +++ b/cmd/converge/server.go @@ -3,13 +3,13 @@ package main import ( "converge/pkg/converge" "converge/pkg/websocketutil" - "flag" "fmt" "log" "net" "net/http" "os" "regexp" + "strings" ) func parsePublicId(path string) (publicId string, _ error) { @@ -26,32 +26,49 @@ func catchAllHandler(w http.ResponseWriter, r *http.Request) { return } -func main() { - downloadOption := flag.String("d", "downloads", - "directory where documentation is located, either relative to current directory or an absolute path") +func printHelp(msg string) { + if msg != "" { + fmt.Fprintf(os.Stderr, "ERROR: %s\n\n", msg) + } + helpText := "Usage: converge [options]\n" + + "\n" + + "Converge server is a rendez-vous server for debugging continuous integration\n" + + "jobs be providing the capability to log into the agents where jobs are running.\n" + + "This is achieve by starting an agent in the continuous integration job\n" + + "which connects to Converge using a websocket connection. The end user also connects\n" + + "to Converge using ssh over websockets. The server then matches the end-user with\n" + + "the agent running in the continous integration job (the rendez-vous) and sets up\n" + + "an end-to-end SSH connection between end-user and agent, with the agent providing\n" + + "an embedded SSH server to provide interactive access to the end-user. This works\n" + + "both on linux and on windows.\n" + + "\n" + + "-d : directory where static content of converge is placed" + fmt.Fprintln(os.Stderr, helpText) + os.Exit(1) +} - flag.Usage = func() { - fmt.Fprintln(os.Stderr, "Usage: converge [options]") - fmt.Fprintln(os.Stderr) - fmt.Fprintln(os.Stderr, "Converge server is a rendez-vous server for debugging continuous integration") - fmt.Fprintln(os.Stderr, "jobs be providing the capability to log into the agents where jobs are running.") - fmt.Fprintln(os.Stderr, "This is achieve by starting an agent in the continuous integration job") - fmt.Fprintln(os.Stderr, "which connects to Converge using a websocket connection. The end user also connects") - fmt.Fprintln(os.Stderr, "to Converge using ssh over websockets. The server then matches the end-user with") - fmt.Fprintln(os.Stderr, "the agent running in the continous integration job (the rendez-vous) and sets up") - fmt.Fprintln(os.Stderr, "an end-to-end SSH connection between end-user and agent, with the agent providing") - fmt.Fprintln(os.Stderr, "an embedded SSH server to provide interactive access to the end-user. This works") - fmt.Fprintln(os.Stderr, "both on linux and on windows.") - fmt.Fprintln(os.Stderr) - flag.PrintDefaults() +func main() { + downloadDir := "downloads" + + args := os.Args[1:] + for len(args) > 0 && strings.HasPrefix(args[0], "-") { + switch args[0] { + case "-d": + if len(args) <= 1 { + printHelp("The -d option expects an argument") + } + downloadDir = args[1] + args = args[1:] + default: + printHelp("Unknown option " + args[0]) + } + args = args[1:] } - flag.Parse() - if flag.NArg() != 0 { - flag.Usage() - os.Exit(1) + log.Println("Content directory", downloadDir) + + if len(args) != 0 { + printHelp("") } - downloadDir := *downloadOption - log.Println("Download directory", downloadDir) admin := converge.NewAdmin() registrationService := websocketutil.WebSocketService{ diff --git a/cmd/tcptows/tcptows.go b/cmd/tcptows/tcptows.go index 8160edc..b0e1738 100644 --- a/cmd/tcptows/tcptows.go +++ b/cmd/tcptows/tcptows.go @@ -4,7 +4,6 @@ import ( "converge/pkg/iowrappers" "converge/pkg/websocketutil" "crypto/tls" - "flag" "fmt" "github.com/gorilla/websocket" "log" @@ -44,23 +43,24 @@ func handleConnection(conn net.Conn, wsURL string, insecure bool) { } func main() { - usage := "Usage: tcptows [options] ws[s]://[:port]/client/\n" + + usage := "Usage: tcptows [--insecure] ws[s]://[:port]/client/\n" + "\n" + "Here is the rendez-vous id of a continuous integratio job\n" - insecure := flag.Bool("insecure", false, "allow invalid certificates") + insecure := false - flag.Usage = func() { - fmt.Fprintln(os.Stderr, usage) - flag.PrintDefaults() + args := os.Args[1:] + + if len(args) == 3 && args[0] == "--insecure" { + insecure = true + args = args[1:] } - flag.Parse() - fmt.Println("Narg ", flag.NArg()) - if flag.NArg() != 2 { - flag.Usage() + + if len(args) != 2 { + fmt.Fprintf(os.Stderr, usage) os.Exit(1) } - tcpPort := flag.Arg(0) - wsURL := flag.Arg(1) + tcpPort := args[0] + wsURL := args[1] listener, err := net.Listen("tcp", ":"+tcpPort) if err != nil { @@ -77,6 +77,6 @@ func main() { log.Println(err) continue } - go handleConnection(conn, wsURL, *insecure) + go handleConnection(conn, wsURL, insecure) } } diff --git a/cmd/wsproxy/wsproxy.go b/cmd/wsproxy/wsproxy.go index d1523e0..1ad271b 100644 --- a/cmd/wsproxy/wsproxy.go +++ b/cmd/wsproxy/wsproxy.go @@ -4,7 +4,6 @@ import ( "converge/pkg/iowrappers" "converge/pkg/websocketutil" "crypto/tls" - "flag" "fmt" "github.com/gorilla/websocket" "log" @@ -31,36 +30,36 @@ func (stdio Stdio) Write(b []byte) (n int, err error) { } func main() { - usage := "Usage: tcptows ws[s]://[:port]/client/\n\n" + + usage := "Usage: wsproxy [--insecure] ws[s]://[:port]/client/\n\n" + "\n" + "Here is the rendez-vous id of a continuous integration job\n" + "\n" + "Use this in an ssh command like this: \n" + "\n" + - " ssh -oProxyCommand='wsproxy ws[s]://[:port]/client/' abc@localhost\n" + + " ssh -oServerAliveInterval=10 -oProxyCommand='wsproxy ws[s]://[:port]/client/' abc@localhost\n" + "\n" + "This latssh connect through wsproxy tocalhost\n" - insecure := flag.Bool("insecure", false, "allow invalid certificates") + args := os.Args[1:] + insecure := false - flag.Usage = func() { - fmt.Fprintln(os.Stderr, usage) - flag.PrintDefaults() + if len(args) == 2 && args[0] == "--insecure" { + insecure = true + args = args[1:] } - flag.Parse() - log.Println("Narg ", flag.NFlag()) - if flag.NArg() != 1 { - flag.Usage() + + if len(args) != 1 { + fmt.Fprintf(os.Stderr, usage) os.Exit(1) } - wsURL := flag.Arg(0) + wsURL := args[0] dialer := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: 45 * time.Second, } - if *insecure { + if insecure { dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } _wsConn, _, err := dialer.Dial(wsURL, nil) diff --git a/pkg/agent/session.go b/pkg/agent/session.go index 77f031a..3bf0136 100644 --- a/pkg/agent/session.go +++ b/pkg/agent/session.go @@ -101,8 +101,8 @@ func ConfigureAgent(commChannel comms.CommChannel, log.Printf("Agent expires at %s", state.expiryTime(holdFilename).Format(time.DateTime)) - comms.SendSessionInfo(state.commChannel) - comms.SendExpiryTimeUpdate(state.commChannel, state.expiryTime(holdFilename)) + comms.Send(state.commChannel, comms.NewAgentInfo()) + comms.Send(state.commChannel, comms.NewExpiryTimeUpdate(state.expiryTime(holdFilename))) go func() { for { @@ -178,6 +178,12 @@ func login(sessionId int, sshSession ssh.Session) { log.Println("New login") hostname, _ := os.Hostname() + sessionType := sshSession.Subsystem() + if sessionType == "" { + sessionType = "ssh" + } + comms.Send(state.commChannel, comms.NewSessionInfo(sessionType)) + holdFileStats, ok := fileExistsWithStats(holdFilename) if ok { if holdFileStats.ModTime().After(time.Now()) { @@ -217,7 +223,7 @@ func formatHelpMessage() string { panic(err) } helpFormattedBuf := bytes.NewBuffer(make([]byte, 0)) - log.Println("Runnning on ", runtime.GOOS) + log.Println("Running on ", runtime.GOOS) data := map[string]string{"os": runtime.GOOS} err = templ.Execute(helpFormattedBuf, data) if err != nil { @@ -291,7 +297,7 @@ func holdFileChange() { message += holdFileMessage() messageUsers(message) state.lastExpiryTimmeReported = newExpiryTIme - comms.SendExpiryTimeUpdate(state.commChannel, state.lastExpiryTimmeReported) + comms.Send(state.commChannel, comms.NewExpiryTimeUpdate(state.expiryTime(holdFilename))) } } diff --git a/pkg/comms/agentserver.go b/pkg/comms/agentserver.go index d75dc75..69fa18b 100644 --- a/pkg/comms/agentserver.go +++ b/pkg/comms/agentserver.go @@ -7,8 +7,6 @@ import ( "io" "log" "net" - "os" - "os/user" "time" ) @@ -19,6 +17,12 @@ type CommChannel struct { Session *yamux.Session } +type AgentListener interface { + AgentInfo(agent AgentInfo) + SessionInfo(session SessionInfo) + ExpiryTimeUpdate(session ExpiryTimeUpdate) +} + type Role int const ( @@ -72,67 +76,89 @@ func NewCommChannel(role Role, wsConn io.ReadWriteCloser) (CommChannel, error) { } log.Println("Communication channel between agent and converge server established") - gob.Register(SessionInfo{}) - gob.Register(ExpiryTimeUpdate{}) - gob.Register(ConvergeMessage{}) - + RegisterEventsWithGob() commChannel.Encoder = gob.NewEncoder(commChannel.Peer) commChannel.Decoder = gob.NewDecoder(commChannel.Peer) - switch role { - case ConvergeServer: - go serverReader(commChannel) + // heartbeat + if role == Agent { + go func() { + for { + time.Sleep(10 * time.Second) + err := Send(commChannel, HeartBeat{}) + if err != nil { + log.Println("Sending heartbeat to server failed") + } + } + }() } return commChannel, nil } -func serverReader(channel CommChannel) { +// Sending an event to the other side + +func Send(commChannel CommChannel, object any) error { + err := commChannel.Encoder.Encode(ConvergeMessage{Value: object}) + if err != nil { + log.Printf("Encoding error %v", err) + } + return err +} + +func ListenForAgentEvents(channel CommChannel, + agentInfo func(agent AgentInfo), + sessionInfo func(session SessionInfo), + expiryTimeUpdate func(session ExpiryTimeUpdate)) { for { var result ConvergeMessage err := channel.Decoder.Decode(&result) if err != nil { // TODO more clean solution, need to explicitly close when agent exits. - log.Printf("Exiting serverReader %v", err) + log.Printf("Exiting agent listener %v", err) return } switch v := result.Value.(type) { + + case AgentInfo: + agentInfo(v) + case SessionInfo: - log.Println("RECEIVED: session info ", v) + sessionInfo(v) + case ExpiryTimeUpdate: - log.Println("RECEIVED: expirytime update ", v) + expiryTimeUpdate(v) + + case HeartBeat: + // for not ignoring, can also implement behavior + // when heartbeat not received but hearbeat is only + // intended to keep the connection up + default: fmt.Printf(" Unknown type: %T\n", v) } } } -type SessionInfo struct { - Username string - Hostname string - Pwd string -} +func ListenForServerEvents(channel CommChannel, + setUsernamePassword func(user UserPassword)) { + for { + var result ConvergeMessage + err := channel.Decoder.Decode(&result) -func NewSessionInfo() SessionInfo { - username, _ := user.Current() - host, _ := os.Hostname() - pwd, _ := os.Getwd() - return SessionInfo{ - Username: username.Username, - Hostname: host, - Pwd: pwd, + if err != nil { + // TODO more clean solution, need to explicitly close when agent exits. + log.Printf("Exiting agent listener %v", err) + return + } + switch v := result.Value.(type) { + + case UserPassword: + setUsernamePassword(v) + + default: + fmt.Printf(" Unknown type: %T\n", v) + } } } - -type ExpiryTimeUpdate struct { - ExpiryTime time.Time -} - -type ConvergeMessage struct { - Value interface{} -} - -func NewExpiryTimeUpdate(expiryTime time.Time) ExpiryTimeUpdate { - return ExpiryTimeUpdate{ExpiryTime: expiryTime} -} diff --git a/pkg/comms/events.go b/pkg/comms/events.go new file mode 100644 index 0000000..b5b4e60 --- /dev/null +++ b/pkg/comms/events.go @@ -0,0 +1,78 @@ +package comms + +import ( + "encoding/gob" + "os" + "os/user" + "runtime" + "time" +) + +// Client to server events + +type AgentInfo struct { + Username string + Hostname string + Pwd string + OS string +} + +type SessionInfo struct { + // "ssh", "sftp" + SessionType string +} + +type ExpiryTimeUpdate struct { + ExpiryTime time.Time +} + +type HeartBeat struct { + // Empty +} + +// Message sent from converge server to agent + +type UserPassword struct { + Username string + Password string +} + +// Generic wrapper message required to send messages of arbitrary type + +type ConvergeMessage struct { + Value interface{} +} + +func NewAgentInfo() AgentInfo { + username, _ := user.Current() + host, _ := os.Hostname() + pwd, _ := os.Getwd() + return AgentInfo{ + Username: username.Username, + Hostname: host, + Pwd: pwd, + OS: runtime.GOOS, + } +} + +func NewSessionInfo(sessionType string) SessionInfo { + return SessionInfo{SessionType: sessionType} +} + +func NewExpiryTimeUpdate(expiryTime time.Time) ExpiryTimeUpdate { + return ExpiryTimeUpdate{ExpiryTime: expiryTime} +} + +func RegisterEventsWithGob() { + // Agent to ConvergeServer + gob.Register(AgentInfo{}) + gob.Register(SessionInfo{}) + gob.Register(ExpiryTimeUpdate{}) + gob.Register(HeartBeat{}) + + // ConvergeServer to Agent + gob.Register(UserPassword{}) + + // Wrapper event. + gob.Register(ConvergeMessage{}) +} diff --git a/pkg/comms/serverapi.go b/pkg/comms/serverapi.go deleted file mode 100644 index e350817..0000000 --- a/pkg/comms/serverapi.go +++ /dev/null @@ -1,20 +0,0 @@ -package comms - -import ( - "log" - "time" -) - -func SendSessionInfo(commChannel CommChannel) { - err := commChannel.Encoder.Encode(ConvergeMessage{Value: NewSessionInfo()}) - if err != nil { - log.Printf("Encoding error %v", err) - } -} - -func SendExpiryTimeUpdate(commChannel CommChannel, expiryTime time.Time) { - err := commChannel.Encoder.Encode(ConvergeMessage{Value: NewExpiryTimeUpdate(expiryTime)}) - if err != nil { - log.Printf("Encoding error %v", err) - } -} diff --git a/pkg/converge/admin.go b/pkg/converge/admin.go index 3e76d9c..8062074 100644 --- a/pkg/converge/admin.go +++ b/pkg/converge/admin.go @@ -4,7 +4,6 @@ import ( "converge/pkg/comms" "converge/pkg/iowrappers" "fmt" - "github.com/hashicorp/yamux" "io" "log" "net" @@ -14,24 +13,27 @@ import ( type Agent struct { // server session - clientSession *yamux.Session - publicId string - startTime time.Time + commChannel comms.CommChannel + publicId string + startTime time.Time + + agentInfo comms.AgentInfo + expiryTime time.Time } type Client struct { - publicId string - agent net.Conn - client iowrappers.ReadWriteAddrCloser - startTime time.Time + publicId string + agent net.Conn + client iowrappers.ReadWriteAddrCloser + startTime time.Time + sessionType string } -func NewAgent(publicId string, - agentSession *yamux.Session) *Agent { +func NewAgent(commChannel comms.CommChannel, publicId string) *Agent { return &Agent{ - clientSession: agentSession, - publicId: publicId, - startTime: time.Now(), + commChannel: commChannel, + publicId: publicId, + startTime: time.Now(), } } @@ -62,19 +64,26 @@ func NewAdmin() *Admin { } func (admin *Admin) logStatus() { - log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") + fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n" + log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME", + "USER", "HOST", "OS") for _, agent := range admin.agents { - agent.clientSession.RemoteAddr() - log.Printf("%-20s %-20s %-20s\n", agent.publicId, + agent.commChannel.Session.RemoteAddr() + log.Printf(fmt, agent.publicId, agent.startTime.Format(time.DateTime), - agent.clientSession.RemoteAddr().String()) + agent.expiryTime.Format(time.DateTime), + agent.agentInfo.Username, + agent.agentInfo.Hostname, + agent.agentInfo.OS) } log.Println("") - log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") + fmt = "%-20s %-20s %-20s %-20s\n" + log.Printf(fmt, "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE") for _, client := range admin.clients { - log.Printf("%-20s %-20s %-20s", client.publicId, + log.Printf(fmt, client.publicId, client.startTime.Format(time.DateTime), - client.client.RemoteAddr()) + client.client.RemoteAddr(), + client.sessionType) } log.Printf("\n") } @@ -88,12 +97,12 @@ func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent, return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId) } - clientSession, err := comms.NewCommChannel(comms.ConvergeServer, conn) + commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn) if err != nil { return nil, err } - agent = NewAgent(publicId, clientSession.Session) + agent = NewAgent(commChannel, publicId) admin.agents[publicId] = agent admin.logStatus() return agent, nil @@ -109,7 +118,7 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAd return nil, fmt.Errorf("No agent found for publicId '%s'", publicId) } - agentConn, err := agent.clientSession.Open() + agentConn, err := agent.commChannel.Session.Open() if err != nil { return nil, err } @@ -130,7 +139,7 @@ func (admin *Admin) RemoveAgent(publicId string) error { return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) } log.Printf("Removing agent: '%s'", publicId) - err := agent.clientSession.Close() + err := agent.commChannel.Session.Close() if err != nil { log.Printf("Could not close yamux client session for '%s'\n", publicId) } @@ -169,8 +178,34 @@ func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error { defer func() { admin.RemoveAgent(publicId) }() - log.Printf("Agent registered: '%s'\n", publicId) - for !agent.clientSession.IsClosed() { + + log.Println("Sending username and password to agent") + comms.Send(agent.commChannel, comms.UserPassword{Username: "abc", Password: "123"}) + + go func() { + comms.ListenForAgentEvents(agent.commChannel, + func(info comms.AgentInfo) { + agent.agentInfo = info + admin.logStatus() + }, + func(session comms.SessionInfo) { + for _, client := range admin.clients { + // a bit hacky. There should be at most one client that has an unset session + // Very unlikely for multiple sessions to start at the same point in time. + if client.publicId == agent.publicId && client.sessionType != session.SessionType { + client.sessionType = session.SessionType + break + } + } + }, + func(expiry comms.ExpiryTimeUpdate) { + agent.expiryTime = expiry.ExpiryTime + admin.logStatus() + }) + }() + + go log.Printf("Agent registered: '%s'\n", publicId) + for !agent.commChannel.Session.IsClosed() { time.Sleep(250 * time.Millisecond) } return nil @@ -190,11 +225,3 @@ func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser iowrappers.SynchronizeStreams(client.client, client.agent) return nil } - -func (admin *Admin) log() { - log.Println("CONNECTIONS") - for _, agent := range admin.agents { - log.Println(agent.publicId) - } - log.Printf("\n") -} diff --git a/static/index.html b/static/index.html index bdb7c5d..8bd0e35 100644 --- a/static/index.html +++ b/static/index.html @@ -68,19 +68,25 @@ # linux curl http{{.secure}}://{{.host}}/docs/agent > agent chmod 755 agent - ./agent ws{{.secure}}://{{.host}}/agent/ID + ./agent -id ID ws{{.secure}}://{{.host}} # windows curl http{{.secure}}://{{.host}}/docs/agent.exe > agent.exe - agent ws{{.secure}}://{{.host}}/agent/ID + agent -id ID ws{{.secure}}://{{.host}}

Above, ID is a unique id for the job, the so-called rendez-cous ID. This should not conflict with IDs used by other agents. The ID is used for a rendez-vous between the end-user on a local system and - the continuous integration job running on a build agent. + the continuous integration job running on a build agent. When ID is specified with the + -id option, a file with ssh authorized (public) keys must be provided in the current directory + named .convergekeys. The file must consist of one key per line. If the -id option is not + specified a random ID is generated and the user must login with a username and password. - This connects the agent to the converge server. Clients can now connect to the Converge - server to establish a connection to the CI job through converge. Communication between + The agent to the converge server and tells it the ID. Clients can now connect to the Converge + server to establish a connection to the CI job through converge by also specifying the same + ID. + + Communication between end-user and agent is encrypted using SSH and the rendez-vous server is unable to read the contents. The rendez-vous server is nothing more then a glorified bit pipe, simply transferring data between end-user SSH client and the agent which runs an @@ -96,6 +102,11 @@ is not detected properly.

+

+ The agent has more options, download the agent and run it without arguments to + see all options. +

+

Local clients: using ssh with a proxy command

wsproxy is a command that can be used as a proxy command for SSH which performs the connection to the