many small changes

* removed the Async utility
* now using Ping message to webclient for keep alive instaed of actual content
* added remote shell to AgentInfo
* retry of connections to the agent
* better logging for SynchronizeStreams
This commit is contained in:
Erik Brakkee 2024-07-31 19:30:38 +02:00 committed by Erik Brakkee
parent 0f10e1d8e7
commit 882f97fa17
13 changed files with 61 additions and 60 deletions

View File

@ -74,7 +74,7 @@ func sshServer(hostKeyFile string, shellCommand string,
s.LocalAddr().String(), "ssh",
)
session.Login(sessionInfo, s)
iowrappers.SynchronizeStreams(process.Pipe(), s)
iowrappers.SynchronizeStreams("shell -- ssh", process.Pipe(), s)
session.LogOut(sessionInfo.ClientId)
// will cause addition goroutines to remmain alive when the SSH
// session is killed. For now acceptable since the agent is a short-lived
@ -108,7 +108,7 @@ func netCatServer(conn io.ReadWriter) {
stdio := bufio.NewReadWriter(
bufio.NewReaderSize(os.Stdin, 0),
bufio.NewWriterSize(os.Stdout, 0))
iowrappers.SynchronizeStreams(conn, stdio)
iowrappers.SynchronizeStreams("stdio -- ws", conn, stdio)
}
type AgentService interface {
@ -169,7 +169,7 @@ func printHelp(msg string) {
"--id: rendez-vous id. When specified an SSH authorized key must be used and password\n" +
" based access is disabled. When not specified a random id is chosen by the agent and\n" +
" password based access is possible. The password is configured on the converge server\n" +
"--ssh-keys-file: SSH authorized keys file in openssh format. By default .authorized_keys in the\n" +
"--authorized-keys: SSH authorized keys file in openssh format. By default .authorized_keys in the\n" +
" directory where the agent is started is used.\n" +
"--warning-time: advance warning time before sessio ends\n" +
"--expiry-time: expiry time of the session\n" +
@ -218,7 +218,7 @@ func main() {
switch args[0] {
case "--id":
id, args = getArg(args)
case "--ssh-keys-file":
case "--authorized-keys":
authorizedKeysFile, args = getArg(args)
case "--warning-time":
advanceWarningTime, args = parseDuration(args, val)
@ -266,7 +266,8 @@ func main() {
wsConn := websocketutil.NewWebSocketConn(conn, false)
defer wsConn.Close()
serverInfo, err := comms.AgentInitialization(wsConn, comms.NewAgentInfo())
shell := chooseShell()
serverInfo, err := comms.AgentInitialization(wsConn, comms.NewAgentInfo(shell))
if err != nil {
log.Printf("ERROR: %v", err)
os.Exit(1)
@ -300,10 +301,6 @@ func main() {
serverInfo.UserPassword,
authorizedKeysFile)
// Choose shell
shell := chooseShell()
var service AgentService
service = ListenerServer(func() *ssh.Server {

View File

@ -39,7 +39,7 @@ func handleConnection(conn net.Conn, wsURL string, insecure bool) {
wsConn := websocketutil.NewWebSocketConn(_wsConn, false)
defer wsConn.Close()
iowrappers.SynchronizeStreams(wsConn, conn)
iowrappers.SynchronizeStreams(wsURL+" -- localport", wsConn, conn)
}
func main() {

View File

@ -73,5 +73,5 @@ func main() {
wsConn := websocketutil.NewWebSocketConn(_wsConn, false)
defer wsConn.Close()
iowrappers.SynchronizeStreams(wsConn, Stdio{})
iowrappers.SynchronizeStreams(wsURL+" -- stdio", wsConn, Stdio{})
}

View File

@ -42,5 +42,5 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request, tcpConn net.Conn) {
}
defer wsConn.Close()
iowrappers.SynchronizeStreams(tcpConn, wsConn)
iowrappers.SynchronizeStreams("tcp -- ws", tcpConn, wsConn)
}

View File

@ -3,7 +3,6 @@ package session
import (
"bytes"
"converge/pkg/comms"
"converge/pkg/support/async"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/gliderlabs/ssh"
@ -107,7 +106,7 @@ func ConfigureAgent(commChannel comms.CommChannel,
go func() {
for {
<-state.ticker.C
events <- async.Async(check)
events <- check
}
}()
go monitorHoldFile()
@ -121,11 +120,15 @@ func ConfigureAgent(commChannel comms.CommChannel,
}
func Login(sessionInfo comms.SessionInfo, sshSession ssh.Session) {
events <- async.Async(login, sessionInfo, sshSession)
events <- func() {
login(sessionInfo, sshSession)
}
}
func LogOut(clientId string) {
events <- async.Async(logOut, clientId)
events <- func() {
logOut(clientId)
}
}
// Internal interface synchronous
@ -148,7 +151,7 @@ func monitorHoldFile() {
}
base := filepath.Base(event.Name)
if base == holdFilename {
events <- async.Async(holdFileChange)
events <- holdFileChange
}
case err, ok := <-watcher.Errors:

View File

@ -21,6 +21,7 @@ type AgentInfo struct {
Hostname string
Pwd string
OS string
Shell string
}
type ClientInfo struct {
@ -72,7 +73,7 @@ type ConvergeMessage struct {
Value interface{}
}
func NewAgentInfo() AgentInfo {
func NewAgentInfo(shell string) AgentInfo {
username, _ := user.Current()
host, _ := os.Hostname()
pwd, _ := os.Getwd()
@ -81,6 +82,7 @@ func NewAgentInfo() AgentInfo {
Hostname: host,
Pwd: pwd,
OS: runtime.GOOS,
Shell: shell,
}
}

View File

@ -189,11 +189,12 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteA
return nil, fmt.Errorf("No agent found for PublicId '%s'", publicId)
}
agentConn, err := agent.commChannel.Session.Open()
agentConn, err := admin.getAgentConnection(agent)
if err != nil {
return nil, err
}
log.Println("Successful websocket connection to agent")
log.Println("Sending connection information to agent")
client := NewClient(publicId, clientConn, agentConn)
@ -212,6 +213,18 @@ func (admin *Admin) addClient(publicId string, clientConn iowrappers2.ReadWriteA
return client, nil
}
func (admin *Admin) getAgentConnection(agent *AgentConnection) (net.Conn, error) {
agentConn, err := agent.commChannel.Session.Open()
count := 0
for err != nil && count < 10 {
log.Printf("Retrying connection to agent: %v", err)
time.Sleep(250 * time.Millisecond)
count++
agentConn, err = agent.commChannel.Session.Open()
}
return agentConn, err
}
func (admin *Admin) RemoveAgent(publicId string) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
@ -312,6 +325,6 @@ func (admin *Admin) Connect(publicId string, conn iowrappers2.ReadWriteAddrClose
}()
log.Printf("Connecting client and agent: '%s'\n", publicId)
iowrappers2.SynchronizeStreams(client.client, client.agent)
iowrappers2.SynchronizeStreams("client -- agent", client.client, client.agent)
return nil
}

View File

@ -56,6 +56,8 @@ func (sessions *WebSessions) NewSession(wsConnection net.Conn) *WebSession {
}
func (session *WebSession) WriteNotifications() {
timer := time.NewTicker(10 * time.Second)
defer timer.Stop()
for {
select {
case notification, ok := <-session.notifications:
@ -73,8 +75,8 @@ func (session *WebSession) WriteNotifications() {
log.Printf("WS connection closed: %v", err)
return
}
case <-time.After(10 * time.Second):
_, err := session.conn.Write([]byte("<div>ping</div>"))
case <-timer.C:
_, err := session.conn.Write(make([]byte, 0, 0))
if err != nil {
log.Printf("WS connection closed: %v", err)
return
@ -92,5 +94,5 @@ func (sessions *WebSessions) SessionClosed(session *WebSession) {
}
func (sessions *WebSessions) logSessions() {
log.Printf("New web session, nsessions %d", len(sessions.sessions))
log.Printf("Web session count %d", len(sessions.sessions))
}

View File

@ -6,7 +6,7 @@ templ Sessions() {
<h1>sessions</h1>
<div id="mycontent">
Initial content
Loading...
</div>
</div>

View File

@ -26,11 +26,10 @@ templ Usage(secure string, host string, username string) {
Above, ID is a unique id for the job, the so-called rendez-cous ID. This should not conflict with IDs
used by other agents. The ID is used for a rendez-vous between the end-user on a local system and
the continuous integration job running on a build agent. If you don't specify an id, a random
id will be generated.
The agent to the converge server and tells it the ID. Clients can now connect to the Converge
server to establish a connection to the CI job through converge by also specifying the same
ID.
id will be generated. If you specify a duplicate ID, the server will generate a new one andd the
agent will tell you what id to use.
Clients can now connect to the Converge server with the ID to establish a connection to
the CI job through Converge.
Communication between
end-user and agent is encrypted using SSH and the rendez-vous server is unable to

View File

@ -1,18 +0,0 @@
package async
import "reflect"
func Async(fn interface{}, args ...interface{}) func() {
fnValue := reflect.ValueOf(fn)
// Prepare the arguments
params := make([]reflect.Value, len(args))
for i, arg := range args {
params[i] = reflect.ValueOf(arg)
}
// Return a function that, when called, will invoke the original function
return func() {
fnValue.Call(params)
}
}

View File

@ -5,7 +5,7 @@ import (
"log"
)
func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
func SynchronizeStreams(description string, stream1, stream2 io.ReadWriter) {
waitChannel := make(chan bool, 2)
go func() {
@ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
}()
_, err := io.Copy(stream1, stream2)
if err != nil {
log.Printf("SynchronizeStreamms: error(1) %v\n", err)
log.Printf("SynchronizeStreams: %s: error <-: %v\n", description, err)
}
}()
@ -24,10 +24,10 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
}()
_, err := io.Copy(stream2, stream1)
if err != nil {
log.Printf("SynchronizeStreams: error(2) %v\n", err)
log.Printf("SynchronizeStreams: %s: error ->: %v\n", description, err)
}
}()
<-waitChannel
log.Println("SynchronizeStreams: Connection closed")
log.Printf("SynchronizeStreams: %s: Connection closed", description)
}

View File

@ -33,7 +33,10 @@ func (websocketConn *WebSocketConn) Read(p []byte) (n int, err error) {
func (websocketConn *WebSocketConn) Write(p []byte) (n int, err error) {
messageType := websocket.BinaryMessage
if websocketConn.text {
switch {
case len(p) == 0:
messageType = websocket.PingMessage
case websocketConn.text:
messageType = websocket.TextMessage
}
err = websocketConn.conn.WriteMessage(messageType, p)