From f82b21b845d64609caf5138661689d1bb4ec1378 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 11 Aug 2024 13:43:59 +0200 Subject: [PATCH] activity detection implemented for sftp. --- cmd/agent/activitydetector.go | 50 +++++++++++++++++++++++++++++++++++ cmd/agent/agent.go | 37 +++++++++----------------- 2 files changed, 63 insertions(+), 24 deletions(-) create mode 100644 cmd/agent/activitydetector.go diff --git a/cmd/agent/activitydetector.go b/cmd/agent/activitydetector.go new file mode 100644 index 0000000..5fa130b --- /dev/null +++ b/cmd/agent/activitydetector.go @@ -0,0 +1,50 @@ +package main + +import ( + "converge/pkg/agent/session" + "io" + "time" +) + +type WriteDetector[T io.ReadWriter] struct { + session T + lastReportedTime time.Time +} + +func (conn *WriteDetector[T]) Read(p []byte) (int, error) { + return conn.session.Read(p) +} + +func (conn *WriteDetector[T]) Write(p []byte) (int, error) { + n, err := conn.session.Write(p) + if err == nil && n > 0 { + // because downloads can generate a lot of write in a short time, we do some throttling + // It could also negatively impact download speed if we handle all the events. + now := time.Now() + if now.Sub(conn.lastReportedTime).Seconds() > 1 { + conn.lastReportedTime = now + session.UserActivityDetected() + } + } + return n, err +} + +func NewWriteDetector(conn io.ReadWriter) *WriteDetector[io.ReadWriter] { + return &WriteDetector[io.ReadWriter]{session: conn} +} + +type SftpActivityDetector struct { + WriteDetector[io.ReadWriteCloser] +} + +func (conn *SftpActivityDetector) Close() error { + return conn.session.Close() +} + +func NewSftpActivityDetector(conn io.ReadWriteCloser) *SftpActivityDetector { + return &SftpActivityDetector{ + WriteDetector: WriteDetector[io.ReadWriteCloser]{ + session: conn, + }, + } +} diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 5bd21f0..d720a35 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -32,20 +32,25 @@ import ( //go:embed hostkey.pem var hostPrivateKey []byte -func SftpHandler(sess ssh.Session) { +func SftpHandler(sftpSession ssh.Session) { sessionInfo := comms.NewSessionInfo( - sess.LocalAddr().String(), + sftpSession.LocalAddr().String(), "sftp", ) - session.Login(sessionInfo, sess) + session.Login(sessionInfo, sftpSession) defer session.LogOut(sessionInfo.ClientId) debugStream := io.Discard serverOptions := []sftp.ServerOption{ sftp.WithDebug(debugStream), } + // activity for sftp means that the server is sending data to the client. + // In contrast to activity detection for ssh we use activity detection based on + // serveractivity. This approach ensures that long downloads are not interrupted + // by a timeout and are allowed to finish. + activityDetector := NewSftpActivityDetector(sftpSession) server, err := sftp.NewServer( - sess, + activityDetector, serverOptions..., ) if err != nil { @@ -60,22 +65,6 @@ func SftpHandler(sess ssh.Session) { } } -type UserActivityDetector struct { - session io.ReadWriter -} - -func (user UserActivityDetector) Read(p []byte) (int, error) { - n, err := user.session.Read(p) - if err == nil && n > 0 { - session.UserActivityDetected() - } - return n, err -} - -func (user UserActivityDetector) Write(p []byte) (int, error) { - return user.session.Write(p) -} - func sshServer(hostKeyFile string, shellCommand string, authorizedPublicKeys *AuthorizedPublicKeys) *ssh.Server { ssh.Handle(func(sshSession ssh.Session) { @@ -89,10 +78,10 @@ func sshServer(hostKeyFile string, shellCommand string, sshSession.LocalAddr().String(), "ssh", ) session.Login(sessionInfo, sshSession) - activityDetector := UserActivityDetector{ - session: sshSession, - } - iowrappers.SynchronizeStreams("shell -- ssh", process.Pipe(), activityDetector) + // For SSH we detect activity when there are writes to the shell that was spanwedn. + // This means user input. + activityDetector := NewWriteDetector(process.Pipe()) + iowrappers.SynchronizeStreams("shell -- ssh", activityDetector, sshSession) session.LogOut(sessionInfo.ClientId) // will cause addition goroutines to remmain alive when the SSH // session is killed. For now acceptable since the agent is a short-lived