activity detection implemented for sftp.

This commit is contained in:
Erik Brakkee 2024-08-11 13:43:59 +02:00
parent b8715bcbe8
commit f82b21b845
2 changed files with 63 additions and 24 deletions

View File

@ -0,0 +1,50 @@
package main
import (
"converge/pkg/agent/session"
"io"
"time"
)
type WriteDetector[T io.ReadWriter] struct {
session T
lastReportedTime time.Time
}
func (conn *WriteDetector[T]) Read(p []byte) (int, error) {
return conn.session.Read(p)
}
func (conn *WriteDetector[T]) Write(p []byte) (int, error) {
n, err := conn.session.Write(p)
if err == nil && n > 0 {
// because downloads can generate a lot of write in a short time, we do some throttling
// It could also negatively impact download speed if we handle all the events.
now := time.Now()
if now.Sub(conn.lastReportedTime).Seconds() > 1 {
conn.lastReportedTime = now
session.UserActivityDetected()
}
}
return n, err
}
func NewWriteDetector(conn io.ReadWriter) *WriteDetector[io.ReadWriter] {
return &WriteDetector[io.ReadWriter]{session: conn}
}
type SftpActivityDetector struct {
WriteDetector[io.ReadWriteCloser]
}
func (conn *SftpActivityDetector) Close() error {
return conn.session.Close()
}
func NewSftpActivityDetector(conn io.ReadWriteCloser) *SftpActivityDetector {
return &SftpActivityDetector{
WriteDetector: WriteDetector[io.ReadWriteCloser]{
session: conn,
},
}
}

View File

@ -32,20 +32,25 @@ import (
//go:embed hostkey.pem
var hostPrivateKey []byte
func SftpHandler(sess ssh.Session) {
func SftpHandler(sftpSession ssh.Session) {
sessionInfo := comms.NewSessionInfo(
sess.LocalAddr().String(),
sftpSession.LocalAddr().String(),
"sftp",
)
session.Login(sessionInfo, sess)
session.Login(sessionInfo, sftpSession)
defer session.LogOut(sessionInfo.ClientId)
debugStream := io.Discard
serverOptions := []sftp.ServerOption{
sftp.WithDebug(debugStream),
}
// activity for sftp means that the server is sending data to the client.
// In contrast to activity detection for ssh we use activity detection based on
// serveractivity. This approach ensures that long downloads are not interrupted
// by a timeout and are allowed to finish.
activityDetector := NewSftpActivityDetector(sftpSession)
server, err := sftp.NewServer(
sess,
activityDetector,
serverOptions...,
)
if err != nil {
@ -60,22 +65,6 @@ func SftpHandler(sess ssh.Session) {
}
}
type UserActivityDetector struct {
session io.ReadWriter
}
func (user UserActivityDetector) Read(p []byte) (int, error) {
n, err := user.session.Read(p)
if err == nil && n > 0 {
session.UserActivityDetected()
}
return n, err
}
func (user UserActivityDetector) Write(p []byte) (int, error) {
return user.session.Write(p)
}
func sshServer(hostKeyFile string, shellCommand string,
authorizedPublicKeys *AuthorizedPublicKeys) *ssh.Server {
ssh.Handle(func(sshSession ssh.Session) {
@ -89,10 +78,10 @@ func sshServer(hostKeyFile string, shellCommand string,
sshSession.LocalAddr().String(), "ssh",
)
session.Login(sessionInfo, sshSession)
activityDetector := UserActivityDetector{
session: sshSession,
}
iowrappers.SynchronizeStreams("shell -- ssh", process.Pipe(), activityDetector)
// For SSH we detect activity when there are writes to the shell that was spanwedn.
// This means user input.
activityDetector := NewWriteDetector(process.Pipe())
iowrappers.SynchronizeStreams("shell -- ssh", activityDetector, sshSession)
session.LogOut(sessionInfo.ClientId)
// will cause addition goroutines to remmain alive when the SSH
// session is killed. For now acceptable since the agent is a short-lived