communication between agent and server. Removed the flags libray for command-line parsing.

Heartbeat mechanism from client to server over the custom connection for sending events to guarantee that the connectoin stays up.
This commit is contained in:
Erik Brakkee 2024-07-25 19:51:11 +02:00
parent 47754990a5
commit 2ed81c3174
10 changed files with 404 additions and 177 deletions

View File

@ -8,19 +8,21 @@ import (
"converge/pkg/terminal" "converge/pkg/terminal"
"converge/pkg/websocketutil" "converge/pkg/websocketutil"
"crypto/tls" "crypto/tls"
"flag"
"fmt" "fmt"
"github.com/gliderlabs/ssh" "github.com/gliderlabs/ssh"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/sftp" "github.com/pkg/sftp"
"io" "io"
"log" "log"
"math/rand"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"os/exec" "os/exec"
"regexp"
"runtime" "runtime"
"strconv"
"strings" "strings"
"time" "time"
@ -55,9 +57,11 @@ func SftpHandler(sess ssh.Session) {
} }
} }
var sshUserCredentials = comms.UserPassword{}
func passwordAuth(ctx ssh.Context, password string) bool { func passwordAuth(ctx ssh.Context, password string) bool {
// Replace with your own logic to validate username and password // Replace with your own logic to validate username and password
return ctx.User() == "abc" && password == "123" return ctx.User() == sshUserCredentials.Username && password == sshUserCredentials.Password
} }
func sshServer(hostKeyFile string, shellCommand string) *ssh.Server { func sshServer(hostKeyFile string, shellCommand string) *ssh.Server {
@ -135,35 +139,109 @@ func (f ReaderFunc) Read(p []byte) (n int, err error) {
return f(p) return f(p)
} }
func validateString(value, description, pattern string) {
matched, err := regexp.MatchString(pattern, value)
if err != nil || !matched {
printHelp(fmt.Sprintf("%s: wrong value '%s', must conform to pattern '%s'",
description, value, pattern))
}
}
func getId(id string) string {
if id == "" {
// not specified
return strconv.Itoa(time.Now().Nanosecond() % 1000000000)
}
validateString(id, "id", `^[a-zA-Z0-9-]+$`)
return id
}
func printHelp(msg string) {
if msg != "" {
fmt.Fprintf(os.Stderr, "ERROR: %s\n\n", msg)
}
helpText := "agent [options] <wsUrl> \n" +
"\n" +
"Run agent with <wsUrl> of the form ws[s]://<host>[:port]\n" +
"Here <ID> is the unique id of the agent that allows rendez-vous with an end-user.\n" +
"The end-user must specify the same id when connecting using ssh.\n" +
"\n" +
"--id: Rendez-vous id. When specified an SSH authorized key must be used and password\n" +
" based access is disabled. When not specified a random id is chosen by the agent and\n" +
" password based access is possible. The password is configured on the converge server\n" +
"--warning-time: advance warning time before sessio ends\n" +
"--expiry-time: expiry time of the session\n" +
"--check-interval: interval at which expiry is checked\n" +
"-insecure: allow invalid certificates\n"
fmt.Fprintln(os.Stderr, helpText)
os.Exit(1)
}
func getArg(args []string) (value string, ret []string) {
if len(args) < 2 {
printHelp(fmt.Sprintf("The '%s' option expects an argument", args[0]))
}
return args[1], args[1:]
}
func parseDuration(args []string, val string) (time.Duration, []string) {
duration, err := time.ParseDuration(val)
if err != nil {
printHelp(fmt.Sprintf("Error parsing duration: %v\n", err))
}
return duration, args[1:]
}
func main() { func main() {
usage := "agent [options] <wsUrl> \n" + // Random user name and password so that effectively no one can login
"\n" + // until the user and password have been received from the server.
"Run agent with <wsUrl> of the form ws[s]://<host>[:port]/agent/<ID>\n" + sshUserCredentials = comms.UserPassword{
"Here <ID> is the unique id of the agent that allows rendez-vous with an end-user.\n" + Username: strconv.Itoa(rand.Int()),
"The end-user must specify the same id when connecting using ssh.\n" Password: strconv.Itoa(rand.Int()),
flag.Usage = func() {
fmt.Fprintf(os.Stderr, usage+"\n")
flag.PrintDefaults()
} }
advanceWarningTime := flag.Duration("warning-time", 5*time.Minute, "advance warning time before sessio ends")
agentExpriryTime := flag.Duration("expiry-time", 10*time.Minute, "expiry time of the session")
tickerInterval := flag.Duration("check-interval", 60*time.Second, "interval at which expiry is checked")
insecure := flag.Bool("insecure", false, "allow invalid certificates")
flag.Parse() id := ""
if flag.NArg() != 1 { advanceWarningTime := 5 * time.Minute
flag.Usage() agentExpriryTime := 10 * time.Minute
os.Exit(1) tickerInterval := 60 * time.Second
insecure := false
args := os.Args[1:]
for len(args) > 0 && strings.HasPrefix(args[0], "-") {
val := ""
switch args[0] {
case "--id":
id, args = getArg(args)
case "--warning-time":
advanceWarningTime, args = parseDuration(args, val)
case "--expiry-time":
agentExpriryTime, args = parseDuration(args, val)
case "--check-interval":
tickerInterval, args = parseDuration(args, val)
case "--insecure":
insecure = true
default:
printHelp("Unknown option " + args[0])
}
args = args[1:]
} }
wsURL := flag.Arg(0)
id = getId(id)
if len(args) != 1 {
printHelp("")
}
validateString(args[0], "wsUrl", `^wss?://[a-zA-Z0-9]*(:[0-9]+)?$`)
wsURL := args[0] + "/agent/" + id
dialer := websocket.Dialer{ dialer := websocket.Dialer{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second, HandshakeTimeout: 45 * time.Second,
} }
if *insecure { if insecure {
dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
} }
conn, _, err := dialer.Dial(wsURL, nil) conn, _, err := dialer.Dial(wsURL, nil)
@ -179,6 +257,11 @@ func main() {
panic(err) panic(err)
} }
go comms.ListenForServerEvents(commChannel, func(user comms.UserPassword) {
log.Println("Username and password configuration received from server")
sshUserCredentials = user
})
var service AgentService var service AgentService
shells := []string{"bash", "sh", "ash", "ksh", "zsh", "fish", "tcsh", "csh"} shells := []string{"bash", "sh", "ash", "ksh", "zsh", "fish", "tcsh", "csh"}
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
@ -206,10 +289,10 @@ func main() {
log.Printf("Clients should use the following commands to connect to this agent:") log.Printf("Clients should use the following commands to connect to this agent:")
log.Println() log.Println()
clientUrl := strings.ReplaceAll(wsURL, "/agent/", "/client/") clientUrl := strings.ReplaceAll(wsURL, "/agent/", "/client/")
sshCommand := fmt.Sprintf("ssh -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" abc@localhost", sshCommand := fmt.Sprintf("ssh -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" %s@localhost",
clientUrl) clientUrl, sshUserCredentials.Username)
sftpCommand := fmt.Sprintf("sftp -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" abc@localhost", sftpCommand := fmt.Sprintf("sftp -oServerAliveInterval=10 -oProxyCommand=\"wsproxy %s\" %s@localhost",
clientUrl) clientUrl, sshUserCredentials.Username)
log.Println(" # For SSH") log.Println(" # For SSH")
log.Println(" " + sshCommand) log.Println(" " + sshCommand)
log.Println() log.Println()
@ -223,6 +306,6 @@ func main() {
"://"+urlObject.Host+"/docs/wsproxy") "://"+urlObject.Host+"/docs/wsproxy")
log.Println() log.Println()
agent.ConfigureAgent(commChannel, *advanceWarningTime, *agentExpriryTime, *tickerInterval) agent.ConfigureAgent(commChannel, advanceWarningTime, agentExpriryTime, tickerInterval)
service.Run(commChannel.Session) service.Run(commChannel.Session)
} }

View File

@ -3,13 +3,13 @@ package main
import ( import (
"converge/pkg/converge" "converge/pkg/converge"
"converge/pkg/websocketutil" "converge/pkg/websocketutil"
"flag"
"fmt" "fmt"
"log" "log"
"net" "net"
"net/http" "net/http"
"os" "os"
"regexp" "regexp"
"strings"
) )
func parsePublicId(path string) (publicId string, _ error) { func parsePublicId(path string) (publicId string, _ error) {
@ -26,32 +26,49 @@ func catchAllHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
func main() { func printHelp(msg string) {
downloadOption := flag.String("d", "downloads", if msg != "" {
"directory where documentation is located, either relative to current directory or an absolute path") fmt.Fprintf(os.Stderr, "ERROR: %s\n\n", msg)
}
helpText := "Usage: converge [options]\n" +
"\n" +
"Converge server is a rendez-vous server for debugging continuous integration\n" +
"jobs be providing the capability to log into the agents where jobs are running.\n" +
"This is achieve by starting an agent in the continuous integration job\n" +
"which connects to Converge using a websocket connection. The end user also connects\n" +
"to Converge using ssh over websockets. The server then matches the end-user with\n" +
"the agent running in the continous integration job (the rendez-vous) and sets up\n" +
"an end-to-end SSH connection between end-user and agent, with the agent providing\n" +
"an embedded SSH server to provide interactive access to the end-user. This works\n" +
"both on linux and on windows.\n" +
"\n" +
"-d <contentdir>: directory where static content of converge is placed"
fmt.Fprintln(os.Stderr, helpText)
os.Exit(1)
}
flag.Usage = func() { func main() {
fmt.Fprintln(os.Stderr, "Usage: converge [options]") downloadDir := "downloads"
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Converge server is a rendez-vous server for debugging continuous integration") args := os.Args[1:]
fmt.Fprintln(os.Stderr, "jobs be providing the capability to log into the agents where jobs are running.") for len(args) > 0 && strings.HasPrefix(args[0], "-") {
fmt.Fprintln(os.Stderr, "This is achieve by starting an agent in the continuous integration job") switch args[0] {
fmt.Fprintln(os.Stderr, "which connects to Converge using a websocket connection. The end user also connects") case "-d":
fmt.Fprintln(os.Stderr, "to Converge using ssh over websockets. The server then matches the end-user with") if len(args) <= 1 {
fmt.Fprintln(os.Stderr, "the agent running in the continous integration job (the rendez-vous) and sets up") printHelp("The -d option expects an argument")
fmt.Fprintln(os.Stderr, "an end-to-end SSH connection between end-user and agent, with the agent providing") }
fmt.Fprintln(os.Stderr, "an embedded SSH server to provide interactive access to the end-user. This works") downloadDir = args[1]
fmt.Fprintln(os.Stderr, "both on linux and on windows.") args = args[1:]
fmt.Fprintln(os.Stderr) default:
flag.PrintDefaults() printHelp("Unknown option " + args[0])
}
args = args[1:]
} }
flag.Parse() log.Println("Content directory", downloadDir)
if flag.NArg() != 0 {
flag.Usage() if len(args) != 0 {
os.Exit(1) printHelp("")
} }
downloadDir := *downloadOption
log.Println("Download directory", downloadDir)
admin := converge.NewAdmin() admin := converge.NewAdmin()
registrationService := websocketutil.WebSocketService{ registrationService := websocketutil.WebSocketService{

View File

@ -4,7 +4,6 @@ import (
"converge/pkg/iowrappers" "converge/pkg/iowrappers"
"converge/pkg/websocketutil" "converge/pkg/websocketutil"
"crypto/tls" "crypto/tls"
"flag"
"fmt" "fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"log" "log"
@ -44,23 +43,24 @@ func handleConnection(conn net.Conn, wsURL string, insecure bool) {
} }
func main() { func main() {
usage := "Usage: tcptows [options] <localport> ws[s]://<host>[:port]/client/<ID>\n" + usage := "Usage: tcptows [--insecure] <localport> ws[s]://<host>[:port]/client/<ID>\n" +
"\n" + "\n" +
"Here <ID> is the rendez-vous id of a continuous integratio job\n" "Here <ID> is the rendez-vous id of a continuous integratio job\n"
insecure := flag.Bool("insecure", false, "allow invalid certificates") insecure := false
flag.Usage = func() { args := os.Args[1:]
fmt.Fprintln(os.Stderr, usage)
flag.PrintDefaults() if len(args) == 3 && args[0] == "--insecure" {
insecure = true
args = args[1:]
} }
flag.Parse()
fmt.Println("Narg ", flag.NArg()) if len(args) != 2 {
if flag.NArg() != 2 { fmt.Fprintf(os.Stderr, usage)
flag.Usage()
os.Exit(1) os.Exit(1)
} }
tcpPort := flag.Arg(0) tcpPort := args[0]
wsURL := flag.Arg(1) wsURL := args[1]
listener, err := net.Listen("tcp", ":"+tcpPort) listener, err := net.Listen("tcp", ":"+tcpPort)
if err != nil { if err != nil {
@ -77,6 +77,6 @@ func main() {
log.Println(err) log.Println(err)
continue continue
} }
go handleConnection(conn, wsURL, *insecure) go handleConnection(conn, wsURL, insecure)
} }
} }

View File

@ -4,7 +4,6 @@ import (
"converge/pkg/iowrappers" "converge/pkg/iowrappers"
"converge/pkg/websocketutil" "converge/pkg/websocketutil"
"crypto/tls" "crypto/tls"
"flag"
"fmt" "fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"log" "log"
@ -31,36 +30,36 @@ func (stdio Stdio) Write(b []byte) (n int, err error) {
} }
func main() { func main() {
usage := "Usage: tcptows ws[s]://<host>[:port]/client/<ID>\n\n" + usage := "Usage: wsproxy [--insecure] ws[s]://<host>[:port]/client/<ID>\n\n" +
"\n" + "\n" +
"Here <ID> is the rendez-vous id of a continuous integration job\n" + "Here <ID> is the rendez-vous id of a continuous integration job\n" +
"\n" + "\n" +
"Use this in an ssh command like this: \n" + "Use this in an ssh command like this: \n" +
"\n" + "\n" +
" ssh -oProxyCommand='wsproxy ws[s]://<host>[:port]/client/<ID>' abc@localhost\n" + " ssh -oServerAliveInterval=10 -oProxyCommand='wsproxy ws[s]://<host>[:port]/client/<ID>' abc@localhost\n" +
"\n" + "\n" +
"This latssh connect through wsproxy tocalhost\n" "This latssh connect through wsproxy tocalhost\n"
insecure := flag.Bool("insecure", false, "allow invalid certificates") args := os.Args[1:]
insecure := false
flag.Usage = func() { if len(args) == 2 && args[0] == "--insecure" {
fmt.Fprintln(os.Stderr, usage) insecure = true
flag.PrintDefaults() args = args[1:]
} }
flag.Parse()
log.Println("Narg ", flag.NFlag()) if len(args) != 1 {
if flag.NArg() != 1 { fmt.Fprintf(os.Stderr, usage)
flag.Usage()
os.Exit(1) os.Exit(1)
} }
wsURL := flag.Arg(0) wsURL := args[0]
dialer := websocket.Dialer{ dialer := websocket.Dialer{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second, HandshakeTimeout: 45 * time.Second,
} }
if *insecure { if insecure {
dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
} }
_wsConn, _, err := dialer.Dial(wsURL, nil) _wsConn, _, err := dialer.Dial(wsURL, nil)

View File

@ -101,8 +101,8 @@ func ConfigureAgent(commChannel comms.CommChannel,
log.Printf("Agent expires at %s", log.Printf("Agent expires at %s",
state.expiryTime(holdFilename).Format(time.DateTime)) state.expiryTime(holdFilename).Format(time.DateTime))
comms.SendSessionInfo(state.commChannel) comms.Send(state.commChannel, comms.NewAgentInfo())
comms.SendExpiryTimeUpdate(state.commChannel, state.expiryTime(holdFilename)) comms.Send(state.commChannel, comms.NewExpiryTimeUpdate(state.expiryTime(holdFilename)))
go func() { go func() {
for { for {
@ -178,6 +178,12 @@ func login(sessionId int, sshSession ssh.Session) {
log.Println("New login") log.Println("New login")
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
sessionType := sshSession.Subsystem()
if sessionType == "" {
sessionType = "ssh"
}
comms.Send(state.commChannel, comms.NewSessionInfo(sessionType))
holdFileStats, ok := fileExistsWithStats(holdFilename) holdFileStats, ok := fileExistsWithStats(holdFilename)
if ok { if ok {
if holdFileStats.ModTime().After(time.Now()) { if holdFileStats.ModTime().After(time.Now()) {
@ -217,7 +223,7 @@ func formatHelpMessage() string {
panic(err) panic(err)
} }
helpFormattedBuf := bytes.NewBuffer(make([]byte, 0)) helpFormattedBuf := bytes.NewBuffer(make([]byte, 0))
log.Println("Runnning on ", runtime.GOOS) log.Println("Running on ", runtime.GOOS)
data := map[string]string{"os": runtime.GOOS} data := map[string]string{"os": runtime.GOOS}
err = templ.Execute(helpFormattedBuf, data) err = templ.Execute(helpFormattedBuf, data)
if err != nil { if err != nil {
@ -291,7 +297,7 @@ func holdFileChange() {
message += holdFileMessage() message += holdFileMessage()
messageUsers(message) messageUsers(message)
state.lastExpiryTimmeReported = newExpiryTIme state.lastExpiryTimmeReported = newExpiryTIme
comms.SendExpiryTimeUpdate(state.commChannel, state.lastExpiryTimmeReported) comms.Send(state.commChannel, comms.NewExpiryTimeUpdate(state.expiryTime(holdFilename)))
} }
} }

View File

@ -7,8 +7,6 @@ import (
"io" "io"
"log" "log"
"net" "net"
"os"
"os/user"
"time" "time"
) )
@ -19,6 +17,12 @@ type CommChannel struct {
Session *yamux.Session Session *yamux.Session
} }
type AgentListener interface {
AgentInfo(agent AgentInfo)
SessionInfo(session SessionInfo)
ExpiryTimeUpdate(session ExpiryTimeUpdate)
}
type Role int type Role int
const ( const (
@ -72,67 +76,89 @@ func NewCommChannel(role Role, wsConn io.ReadWriteCloser) (CommChannel, error) {
} }
log.Println("Communication channel between agent and converge server established") log.Println("Communication channel between agent and converge server established")
gob.Register(SessionInfo{}) RegisterEventsWithGob()
gob.Register(ExpiryTimeUpdate{})
gob.Register(ConvergeMessage{})
commChannel.Encoder = gob.NewEncoder(commChannel.Peer) commChannel.Encoder = gob.NewEncoder(commChannel.Peer)
commChannel.Decoder = gob.NewDecoder(commChannel.Peer) commChannel.Decoder = gob.NewDecoder(commChannel.Peer)
switch role { // heartbeat
case ConvergeServer: if role == Agent {
go serverReader(commChannel) go func() {
for {
time.Sleep(10 * time.Second)
err := Send(commChannel, HeartBeat{})
if err != nil {
log.Println("Sending heartbeat to server failed")
}
}
}()
} }
return commChannel, nil return commChannel, nil
} }
func serverReader(channel CommChannel) { // Sending an event to the other side
func Send(commChannel CommChannel, object any) error {
err := commChannel.Encoder.Encode(ConvergeMessage{Value: object})
if err != nil {
log.Printf("Encoding error %v", err)
}
return err
}
func ListenForAgentEvents(channel CommChannel,
agentInfo func(agent AgentInfo),
sessionInfo func(session SessionInfo),
expiryTimeUpdate func(session ExpiryTimeUpdate)) {
for { for {
var result ConvergeMessage var result ConvergeMessage
err := channel.Decoder.Decode(&result) err := channel.Decoder.Decode(&result)
if err != nil { if err != nil {
// TODO more clean solution, need to explicitly close when agent exits. // TODO more clean solution, need to explicitly close when agent exits.
log.Printf("Exiting serverReader %v", err) log.Printf("Exiting agent listener %v", err)
return return
} }
switch v := result.Value.(type) { switch v := result.Value.(type) {
case AgentInfo:
agentInfo(v)
case SessionInfo: case SessionInfo:
log.Println("RECEIVED: session info ", v) sessionInfo(v)
case ExpiryTimeUpdate: case ExpiryTimeUpdate:
log.Println("RECEIVED: expirytime update ", v) expiryTimeUpdate(v)
case HeartBeat:
// for not ignoring, can also implement behavior
// when heartbeat not received but hearbeat is only
// intended to keep the connection up
default: default:
fmt.Printf(" Unknown type: %T\n", v) fmt.Printf(" Unknown type: %T\n", v)
} }
} }
} }
type SessionInfo struct { func ListenForServerEvents(channel CommChannel,
Username string setUsernamePassword func(user UserPassword)) {
Hostname string for {
Pwd string var result ConvergeMessage
} err := channel.Decoder.Decode(&result)
func NewSessionInfo() SessionInfo { if err != nil {
username, _ := user.Current() // TODO more clean solution, need to explicitly close when agent exits.
host, _ := os.Hostname() log.Printf("Exiting agent listener %v", err)
pwd, _ := os.Getwd() return
return SessionInfo{ }
Username: username.Username, switch v := result.Value.(type) {
Hostname: host,
Pwd: pwd, case UserPassword:
setUsernamePassword(v)
default:
fmt.Printf(" Unknown type: %T\n", v)
}
} }
} }
type ExpiryTimeUpdate struct {
ExpiryTime time.Time
}
type ConvergeMessage struct {
Value interface{}
}
func NewExpiryTimeUpdate(expiryTime time.Time) ExpiryTimeUpdate {
return ExpiryTimeUpdate{ExpiryTime: expiryTime}
}

78
pkg/comms/events.go Normal file
View File

@ -0,0 +1,78 @@
package comms
import (
"encoding/gob"
"os"
"os/user"
"runtime"
"time"
)
// Client to server events
type AgentInfo struct {
Username string
Hostname string
Pwd string
OS string
}
type SessionInfo struct {
// "ssh", "sftp"
SessionType string
}
type ExpiryTimeUpdate struct {
ExpiryTime time.Time
}
type HeartBeat struct {
// Empty
}
// Message sent from converge server to agent
type UserPassword struct {
Username string
Password string
}
// Generic wrapper message required to send messages of arbitrary type
type ConvergeMessage struct {
Value interface{}
}
func NewAgentInfo() AgentInfo {
username, _ := user.Current()
host, _ := os.Hostname()
pwd, _ := os.Getwd()
return AgentInfo{
Username: username.Username,
Hostname: host,
Pwd: pwd,
OS: runtime.GOOS,
}
}
func NewSessionInfo(sessionType string) SessionInfo {
return SessionInfo{SessionType: sessionType}
}
func NewExpiryTimeUpdate(expiryTime time.Time) ExpiryTimeUpdate {
return ExpiryTimeUpdate{ExpiryTime: expiryTime}
}
func RegisterEventsWithGob() {
// Agent to ConvergeServer
gob.Register(AgentInfo{})
gob.Register(SessionInfo{})
gob.Register(ExpiryTimeUpdate{})
gob.Register(HeartBeat{})
// ConvergeServer to Agent
gob.Register(UserPassword{})
// Wrapper event.
gob.Register(ConvergeMessage{})
}

View File

@ -1,20 +0,0 @@
package comms
import (
"log"
"time"
)
func SendSessionInfo(commChannel CommChannel) {
err := commChannel.Encoder.Encode(ConvergeMessage{Value: NewSessionInfo()})
if err != nil {
log.Printf("Encoding error %v", err)
}
}
func SendExpiryTimeUpdate(commChannel CommChannel, expiryTime time.Time) {
err := commChannel.Encoder.Encode(ConvergeMessage{Value: NewExpiryTimeUpdate(expiryTime)})
if err != nil {
log.Printf("Encoding error %v", err)
}
}

View File

@ -4,7 +4,6 @@ import (
"converge/pkg/comms" "converge/pkg/comms"
"converge/pkg/iowrappers" "converge/pkg/iowrappers"
"fmt" "fmt"
"github.com/hashicorp/yamux"
"io" "io"
"log" "log"
"net" "net"
@ -14,24 +13,27 @@ import (
type Agent struct { type Agent struct {
// server session // server session
clientSession *yamux.Session commChannel comms.CommChannel
publicId string publicId string
startTime time.Time startTime time.Time
agentInfo comms.AgentInfo
expiryTime time.Time
} }
type Client struct { type Client struct {
publicId string publicId string
agent net.Conn agent net.Conn
client iowrappers.ReadWriteAddrCloser client iowrappers.ReadWriteAddrCloser
startTime time.Time startTime time.Time
sessionType string
} }
func NewAgent(publicId string, func NewAgent(commChannel comms.CommChannel, publicId string) *Agent {
agentSession *yamux.Session) *Agent {
return &Agent{ return &Agent{
clientSession: agentSession, commChannel: commChannel,
publicId: publicId, publicId: publicId,
startTime: time.Now(), startTime: time.Now(),
} }
} }
@ -62,19 +64,26 @@ func NewAdmin() *Admin {
} }
func (admin *Admin) logStatus() { func (admin *Admin) logStatus() {
log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") fmt := "%-20s %-20s %-20s %-10s %-15s %-20s\n"
log.Printf(fmt, "AGENT", "ACTIVE_SINCE", "EXPIRY_TIME",
"USER", "HOST", "OS")
for _, agent := range admin.agents { for _, agent := range admin.agents {
agent.clientSession.RemoteAddr() agent.commChannel.Session.RemoteAddr()
log.Printf("%-20s %-20s %-20s\n", agent.publicId, log.Printf(fmt, agent.publicId,
agent.startTime.Format(time.DateTime), agent.startTime.Format(time.DateTime),
agent.clientSession.RemoteAddr().String()) agent.expiryTime.Format(time.DateTime),
agent.agentInfo.Username,
agent.agentInfo.Hostname,
agent.agentInfo.OS)
} }
log.Println("") log.Println("")
log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS") fmt = "%-20s %-20s %-20s %-20s\n"
log.Printf(fmt, "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS", "SESSION_TYPE")
for _, client := range admin.clients { for _, client := range admin.clients {
log.Printf("%-20s %-20s %-20s", client.publicId, log.Printf(fmt, client.publicId,
client.startTime.Format(time.DateTime), client.startTime.Format(time.DateTime),
client.client.RemoteAddr()) client.client.RemoteAddr(),
client.sessionType)
} }
log.Printf("\n") log.Printf("\n")
} }
@ -88,12 +97,12 @@ func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent,
return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId) return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId)
} }
clientSession, err := comms.NewCommChannel(comms.ConvergeServer, conn) commChannel, err := comms.NewCommChannel(comms.ConvergeServer, conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
agent = NewAgent(publicId, clientSession.Session) agent = NewAgent(commChannel, publicId)
admin.agents[publicId] = agent admin.agents[publicId] = agent
admin.logStatus() admin.logStatus()
return agent, nil return agent, nil
@ -109,7 +118,7 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAd
return nil, fmt.Errorf("No agent found for publicId '%s'", publicId) return nil, fmt.Errorf("No agent found for publicId '%s'", publicId)
} }
agentConn, err := agent.clientSession.Open() agentConn, err := agent.commChannel.Session.Open()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -130,7 +139,7 @@ func (admin *Admin) RemoveAgent(publicId string) error {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId) return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
} }
log.Printf("Removing agent: '%s'", publicId) log.Printf("Removing agent: '%s'", publicId)
err := agent.clientSession.Close() err := agent.commChannel.Session.Close()
if err != nil { if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId) log.Printf("Could not close yamux client session for '%s'\n", publicId)
} }
@ -169,8 +178,34 @@ func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error {
defer func() { defer func() {
admin.RemoveAgent(publicId) admin.RemoveAgent(publicId)
}() }()
log.Printf("Agent registered: '%s'\n", publicId)
for !agent.clientSession.IsClosed() { log.Println("Sending username and password to agent")
comms.Send(agent.commChannel, comms.UserPassword{Username: "abc", Password: "123"})
go func() {
comms.ListenForAgentEvents(agent.commChannel,
func(info comms.AgentInfo) {
agent.agentInfo = info
admin.logStatus()
},
func(session comms.SessionInfo) {
for _, client := range admin.clients {
// a bit hacky. There should be at most one client that has an unset session
// Very unlikely for multiple sessions to start at the same point in time.
if client.publicId == agent.publicId && client.sessionType != session.SessionType {
client.sessionType = session.SessionType
break
}
}
},
func(expiry comms.ExpiryTimeUpdate) {
agent.expiryTime = expiry.ExpiryTime
admin.logStatus()
})
}()
go log.Printf("Agent registered: '%s'\n", publicId)
for !agent.commChannel.Session.IsClosed() {
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
} }
return nil return nil
@ -190,11 +225,3 @@ func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser
iowrappers.SynchronizeStreams(client.client, client.agent) iowrappers.SynchronizeStreams(client.client, client.agent)
return nil return nil
} }
func (admin *Admin) log() {
log.Println("CONNECTIONS")
for _, agent := range admin.agents {
log.Println(agent.publicId)
}
log.Printf("\n")
}

View File

@ -68,19 +68,25 @@
# linux # linux
curl http{{.secure}}://{{.host}}/docs/agent > agent curl http{{.secure}}://{{.host}}/docs/agent > agent
chmod 755 agent chmod 755 agent
./agent ws{{.secure}}://{{.host}}/agent/ID ./agent -id ID ws{{.secure}}://{{.host}}
# windows # windows
curl http{{.secure}}://{{.host}}/docs/agent.exe > agent.exe curl http{{.secure}}://{{.host}}/docs/agent.exe > agent.exe
agent ws{{.secure}}://{{.host}}/agent/ID agent -id ID ws{{.secure}}://{{.host}}
</pre> </pre>
<p> <p>
Above, ID is a unique id for the job, the so-called rendez-cous ID. This should not conflict with IDs Above, ID is a unique id for the job, the so-called rendez-cous ID. This should not conflict with IDs
used by other agents. The ID is used for a rendez-vous between the end-user on a local system and used by other agents. The ID is used for a rendez-vous between the end-user on a local system and
the continuous integration job running on a build agent. the continuous integration job running on a build agent. When ID is specified with the
-id option, a file with ssh authorized (public) keys must be provided in the current directory
named .convergekeys. The file must consist of one key per line. If the -id option is not
specified a random ID is generated and the user must login with a username and password.
This connects the agent to the converge server. Clients can now connect to the Converge The agent to the converge server and tells it the ID. Clients can now connect to the Converge
server to establish a connection to the CI job through converge. Communication between server to establish a connection to the CI job through converge by also specifying the same
ID.
Communication between
end-user and agent is encrypted using SSH and the rendez-vous server is unable to end-user and agent is encrypted using SSH and the rendez-vous server is unable to
read the contents. The rendez-vous server is nothing more then a glorified bit pipe, read the contents. The rendez-vous server is nothing more then a glorified bit pipe,
simply transferring data between end-user SSH client and the agent which runs an simply transferring data between end-user SSH client and the agent which runs an
@ -96,6 +102,11 @@
is not detected properly. is not detected properly.
</p> </p>
<p>
The agent has more options, download the agent and run it without arguments to
see all options.
</p>
<h2>Local clients: using ssh with a proxy command </h2> <h2>Local clients: using ssh with a proxy command </h2>
<p><code>wsproxy</code> is a command that can be used as a proxy command for SSH which performs the connection to the <p><code>wsproxy</code> is a command that can be used as a proxy command for SSH which performs the connection to the