many small changes
* removed the Async utility * now using Ping message to webclient for keep alive instaed of actual content * added remote shell to AgentInfo * retry of connections to the agent * better logging for SynchronizeStreams
This commit is contained in:
		
							parent
							
								
									658aaf3880
								
							
						
					
					
						commit
						f0dd810541
					
				| @ -74,7 +74,7 @@ func sshServer(hostKeyFile string, shellCommand string, | ||||
| 			s.LocalAddr().String(), "ssh", | ||||
| 		) | ||||
| 		session.Login(sessionInfo, s) | ||||
| 		iowrappers.SynchronizeStreams(process.Pipe(), s) | ||||
| 		iowrappers.SynchronizeStreams("shell -- ssh", process.Pipe(), s) | ||||
| 		session.LogOut(sessionInfo.ClientId) | ||||
| 		// will cause addition goroutines to remmain alive when the SSH
 | ||||
| 		// session is killed. For now acceptable since the agent is a short-lived
 | ||||
| @ -108,7 +108,7 @@ func netCatServer(conn io.ReadWriter) { | ||||
| 	stdio := bufio.NewReadWriter( | ||||
| 		bufio.NewReaderSize(os.Stdin, 0), | ||||
| 		bufio.NewWriterSize(os.Stdout, 0)) | ||||
| 	iowrappers.SynchronizeStreams(conn, stdio) | ||||
| 	iowrappers.SynchronizeStreams("stdio -- ws", conn, stdio) | ||||
| } | ||||
| 
 | ||||
| type AgentService interface { | ||||
| @ -169,7 +169,7 @@ func printHelp(msg string) { | ||||
| 		"--id:               rendez-vous id. When specified an SSH authorized key must be used and password\n" + | ||||
| 		"                    based access is disabled. When not specified a random id is chosen by the agent and\n" + | ||||
| 		"                    password based access is possible. The password is configured on the converge server\n" + | ||||
| 		"--ssh-keys-file:  SSH authorized keys file in openssh format. By default .authorized_keys in the\n" + | ||||
| 		"--authorized-keys:  SSH authorized keys file in openssh format. By default .authorized_keys in the\n" + | ||||
| 		"                    directory where the agent is started is used.\n" + | ||||
| 		"--warning-time:     advance warning time before sessio ends\n" + | ||||
| 		"--expiry-time:      expiry time of the session\n" + | ||||
| @ -218,7 +218,7 @@ func main() { | ||||
| 		switch args[0] { | ||||
| 		case "--id": | ||||
| 			id, args = getArg(args) | ||||
| 		case "--ssh-keys-file": | ||||
| 		case "--authorized-keys": | ||||
| 			authorizedKeysFile, args = getArg(args) | ||||
| 		case "--warning-time": | ||||
| 			advanceWarningTime, args = parseDuration(args, val) | ||||
| @ -266,7 +266,8 @@ func main() { | ||||
| 	wsConn := websocketutil.NewWebSocketConn(conn, false) | ||||
| 	defer wsConn.Close() | ||||
| 
 | ||||
| 	serverInfo, err := comms.AgentInitialization(wsConn, comms.NewAgentInfo()) | ||||
| 	shell := chooseShell() | ||||
| 	serverInfo, err := comms.AgentInitialization(wsConn, comms.NewAgentInfo(shell)) | ||||
| 	if err != nil { | ||||
| 		log.Printf("ERROR: %v", err) | ||||
| 		os.Exit(1) | ||||
| @ -300,10 +301,6 @@ func main() { | ||||
| 		serverInfo.UserPassword, | ||||
| 		authorizedKeysFile) | ||||
| 
 | ||||
| 	// Choose shell
 | ||||
| 
 | ||||
| 	shell := chooseShell() | ||||
| 
 | ||||
| 	var service AgentService | ||||
| 
 | ||||
| 	service = ListenerServer(func() *ssh.Server { | ||||
|  | ||||
| @ -39,7 +39,7 @@ func handleConnection(conn net.Conn, wsURL string, insecure bool) { | ||||
| 	wsConn := websocketutil.NewWebSocketConn(_wsConn, false) | ||||
| 	defer wsConn.Close() | ||||
| 
 | ||||
| 	iowrappers.SynchronizeStreams(wsConn, conn) | ||||
| 	iowrappers.SynchronizeStreams(wsURL+" -- localport", wsConn, conn) | ||||
| } | ||||
| 
 | ||||
| func main() { | ||||
|  | ||||
| @ -73,5 +73,5 @@ func main() { | ||||
| 	wsConn := websocketutil.NewWebSocketConn(_wsConn, false) | ||||
| 	defer wsConn.Close() | ||||
| 
 | ||||
| 	iowrappers.SynchronizeStreams(wsConn, Stdio{}) | ||||
| 	iowrappers.SynchronizeStreams(wsURL+" -- stdio", wsConn, Stdio{}) | ||||
| } | ||||
|  | ||||
| @ -42,5 +42,5 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request, tcpConn net.Conn) { | ||||
| 	} | ||||
| 	defer wsConn.Close() | ||||
| 
 | ||||
| 	iowrappers.SynchronizeStreams(tcpConn, wsConn) | ||||
| 	iowrappers.SynchronizeStreams("tcp -- ws", tcpConn, wsConn) | ||||
| } | ||||
|  | ||||
| @ -3,7 +3,6 @@ package session | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"converge/pkg/comms" | ||||
| 	"converge/pkg/support/async" | ||||
| 	"fmt" | ||||
| 	"github.com/fsnotify/fsnotify" | ||||
| 	"github.com/gliderlabs/ssh" | ||||
| @ -107,7 +106,7 @@ func ConfigureAgent(commChannel comms.CommChannel, | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			<-state.ticker.C | ||||
| 			events <- async.Async(check) | ||||
| 			events <- check | ||||
| 		} | ||||
| 	}() | ||||
| 	go monitorHoldFile() | ||||
| @ -121,11 +120,15 @@ func ConfigureAgent(commChannel comms.CommChannel, | ||||
| } | ||||
| 
 | ||||
| func Login(sessionInfo comms.SessionInfo, sshSession ssh.Session) { | ||||
| 	events <- async.Async(login, sessionInfo, sshSession) | ||||
| 	events <- func() { | ||||
| 		login(sessionInfo, sshSession) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func LogOut(clientId string) { | ||||
| 	events <- async.Async(logOut, clientId) | ||||
| 	events <- func() { | ||||
| 		logOut(clientId) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Internal interface synchronous
 | ||||
| @ -148,7 +151,7 @@ func monitorHoldFile() { | ||||
| 			} | ||||
| 			base := filepath.Base(event.Name) | ||||
| 			if base == holdFilename { | ||||
| 				events <- async.Async(holdFileChange) | ||||
| 				events <- holdFileChange | ||||
| 			} | ||||
| 
 | ||||
| 		case err, ok := <-watcher.Errors: | ||||
|  | ||||
| @ -21,6 +21,7 @@ type AgentInfo struct { | ||||
| 	Hostname string | ||||
| 	Pwd      string | ||||
| 	OS       string | ||||
| 	Shell    string | ||||
| } | ||||
| 
 | ||||
| type ClientInfo struct { | ||||
| @ -72,7 +73,7 @@ type ConvergeMessage struct { | ||||
| 	Value interface{} | ||||
| } | ||||
| 
 | ||||
| func NewAgentInfo() AgentInfo { | ||||
| func NewAgentInfo(shell string) AgentInfo { | ||||
| 	username, _ := user.Current() | ||||
| 	host, _ := os.Hostname() | ||||
| 	pwd, _ := os.Getwd() | ||||
| @ -81,6 +82,7 @@ func NewAgentInfo() AgentInfo { | ||||
| 		Hostname: host, | ||||
| 		Pwd:      pwd, | ||||
| 		OS:       runtime.GOOS, | ||||
| 		Shell:    shell, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -189,11 +189,12 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteA | ||||
| 		return nil, fmt.Errorf("No agent found for PublicId '%s'", publicId) | ||||
| 	} | ||||
| 
 | ||||
| 	agentConn, err := agent.commChannel.Session.Open() | ||||
| 	agentConn, err := admin.getAgentConnection(agent) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	log.Println("Successful websocket connection to agent") | ||||
| 
 | ||||
| 	log.Println("Sending connection information to agent") | ||||
| 
 | ||||
| 	client := NewClient(publicId, clientConn, agentConn) | ||||
| @ -212,6 +213,18 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteA | ||||
| 	return client, nil | ||||
| } | ||||
| 
 | ||||
| func (admin *Admin) getAgentConnection(agent *AgentConnection) (net.Conn, error) { | ||||
| 	agentConn, err := agent.commChannel.Session.Open() | ||||
| 	count := 0 | ||||
| 	for err != nil && count < 10 { | ||||
| 		log.Printf("Retrying connection to agent: %v", err) | ||||
| 		time.Sleep(250 * time.Millisecond) | ||||
| 		count++ | ||||
| 		agentConn, err = agent.commChannel.Session.Open() | ||||
| 	} | ||||
| 	return agentConn, err | ||||
| } | ||||
| 
 | ||||
| func (admin *Admin) RemoveAgent(publicId string) error { | ||||
| 	admin.mutex.Lock() | ||||
| 	defer admin.mutex.Unlock() | ||||
| @ -312,6 +325,6 @@ func (admin *Admin) Connect(publicId string, conn iowrappers2.ReadWriteAddrClose | ||||
| 	}() | ||||
| 	log.Printf("Connecting client and agent: '%s'\n", publicId) | ||||
| 
 | ||||
| 	iowrappers2.SynchronizeStreams(client.client, client.agent) | ||||
| 	iowrappers2.SynchronizeStreams("client -- agent", client.client, client.agent) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -56,6 +56,8 @@ func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession { | ||||
| } | ||||
| 
 | ||||
| func (session *WebSession) WriteNotifications() { | ||||
| 	timer := time.NewTicker(10 * time.Second) | ||||
| 	defer timer.Stop() | ||||
| 	for { | ||||
| 		select { | ||||
| 		case notification, ok := <-session.notifications: | ||||
| @ -73,8 +75,8 @@ func (session *WebSession) WriteNotifications() { | ||||
| 				log.Printf("WS connection closed: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 		case <-time.After(10 * time.Second): | ||||
| 			_, err := session.conn.Write([]byte("<div>ping</div>")) | ||||
| 		case <-timer.C: | ||||
| 			_, err := session.conn.Write(make([]byte, 0, 0)) | ||||
| 			if err != nil { | ||||
| 				log.Printf("WS connection closed: %v", err) | ||||
| 				return | ||||
| @ -92,5 +94,5 @@ func (sessions *WebSessions) SessionClosed(session *WebSession) { | ||||
| } | ||||
| 
 | ||||
| func (sessions *WebSessions) logSessions() { | ||||
| 	log.Printf("New web session, nsessions %d", len(sessions.sessions)) | ||||
| 	log.Printf("Web session count %d", len(sessions.sessions)) | ||||
| } | ||||
|  | ||||
| @ -6,7 +6,7 @@ templ Sessions() { | ||||
|       <h1>sessions</h1> | ||||
| 
 | ||||
|       <div id="mycontent"> | ||||
|         Initial content | ||||
|         Loading... | ||||
|       </div> | ||||
|   </div> | ||||
| 
 | ||||
|  | ||||
| @ -26,11 +26,10 @@ templ Usage(secure string, host string, username string) { | ||||
|                           Above, ID is a unique id for the job, the so-called rendez-cous ID. This should not conflict with IDs | ||||
|                           used by other agents. The ID is used for a rendez-vous between the end-user on a local system and | ||||
|                           the continuous integration job running on a build agent. If you don't specify an id, a random | ||||
|                           id will be generated. | ||||
| 
 | ||||
|                           The agent to the converge server and tells it the ID. Clients can now connect to the Converge | ||||
|                           server to establish a connection to the CI job through converge by also specifying the same | ||||
|                           ID. | ||||
|                           id will be generated. If you specify a duplicate ID, the server will generate a new one andd the | ||||
|                           agent will tell you what id to use. | ||||
|                           Clients can now connect to the Converge server with the ID to establish a connection to | ||||
|                           the CI job through Converge. | ||||
| 
 | ||||
|                           Communication between | ||||
|                           end-user and agent is encrypted using SSH and the rendez-vous server is unable to | ||||
|  | ||||
| @ -1,18 +0,0 @@ | ||||
| package async | ||||
| 
 | ||||
| import "reflect" | ||||
| 
 | ||||
| func Async(fn interface{}, args ...interface{}) func() { | ||||
| 	fnValue := reflect.ValueOf(fn) | ||||
| 
 | ||||
| 	// Prepare the arguments
 | ||||
| 	params := make([]reflect.Value, len(args)) | ||||
| 	for i, arg := range args { | ||||
| 		params[i] = reflect.ValueOf(arg) | ||||
| 	} | ||||
| 
 | ||||
| 	// Return a function that, when called, will invoke the original function
 | ||||
| 	return func() { | ||||
| 		fnValue.Call(params) | ||||
| 	} | ||||
| } | ||||
| @ -5,7 +5,7 @@ import ( | ||||
| 	"log" | ||||
| ) | ||||
| 
 | ||||
| func SynchronizeStreams(stream1, stream2 io.ReadWriter) { | ||||
| func SynchronizeStreams(description string, stream1, stream2 io.ReadWriter) { | ||||
| 	waitChannel := make(chan bool, 2) | ||||
| 
 | ||||
| 	go func() { | ||||
| @ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { | ||||
| 		}() | ||||
| 		_, err := io.Copy(stream1, stream2) | ||||
| 		if err != nil { | ||||
| 			log.Printf("SynchronizeStreamms: error(1) %v\n", err) | ||||
| 			log.Printf("SynchronizeStreams: %s: error <-: %v\n", description, err) | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| @ -24,10 +24,10 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { | ||||
| 		}() | ||||
| 		_, err := io.Copy(stream2, stream1) | ||||
| 		if err != nil { | ||||
| 			log.Printf("SynchronizeStreams: error(2) %v\n", err) | ||||
| 			log.Printf("SynchronizeStreams: %s: error ->: %v\n", description, err) | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	<-waitChannel | ||||
| 	log.Println("SynchronizeStreams: Connection closed") | ||||
| 	log.Printf("SynchronizeStreams: %s: Connection closed", description) | ||||
| } | ||||
|  | ||||
| @ -33,7 +33,10 @@ func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) { | ||||
| 
 | ||||
| func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) { | ||||
| 	messageType := websocket.BinaryMessage | ||||
| 	if websocketConn.text { | ||||
| 	switch { | ||||
| 	case len(p) == 0: | ||||
| 		messageType = websocket.PingMessage | ||||
| 	case websocketConn.text: | ||||
| 		messageType = websocket.TextMessage | ||||
| 	} | ||||
| 	err = websocketConn.conn.WriteMessage(messageType, p) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user