activity detection implemented for sftp.
This commit is contained in:
parent
2198dae2ea
commit
3ca1f657b7
50
cmd/agent/activitydetector.go
Normal file
50
cmd/agent/activitydetector.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"converge/pkg/agent/session"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WriteDetector[T io.ReadWriter] struct {
|
||||||
|
session T
|
||||||
|
lastReportedTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *WriteDetector[T]) Read(p []byte) (int, error) {
|
||||||
|
return conn.session.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *WriteDetector[T]) Write(p []byte) (int, error) {
|
||||||
|
n, err := conn.session.Write(p)
|
||||||
|
if err == nil && n > 0 {
|
||||||
|
// because downloads can generate a lot of write in a short time, we do some throttling
|
||||||
|
// It could also negatively impact download speed if we handle all the events.
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(conn.lastReportedTime).Seconds() > 1 {
|
||||||
|
conn.lastReportedTime = now
|
||||||
|
session.UserActivityDetected()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWriteDetector(conn io.ReadWriter) *WriteDetector[io.ReadWriter] {
|
||||||
|
return &WriteDetector[io.ReadWriter]{session: conn}
|
||||||
|
}
|
||||||
|
|
||||||
|
type SftpActivityDetector struct {
|
||||||
|
WriteDetector[io.ReadWriteCloser]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *SftpActivityDetector) Close() error {
|
||||||
|
return conn.session.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSftpActivityDetector(conn io.ReadWriteCloser) *SftpActivityDetector {
|
||||||
|
return &SftpActivityDetector{
|
||||||
|
WriteDetector: WriteDetector[io.ReadWriteCloser]{
|
||||||
|
session: conn,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
@ -32,20 +32,25 @@ import (
|
|||||||
//go:embed hostkey.pem
|
//go:embed hostkey.pem
|
||||||
var hostPrivateKey []byte
|
var hostPrivateKey []byte
|
||||||
|
|
||||||
func SftpHandler(sess ssh.Session) {
|
func SftpHandler(sftpSession ssh.Session) {
|
||||||
sessionInfo := comms.NewSessionInfo(
|
sessionInfo := comms.NewSessionInfo(
|
||||||
sess.LocalAddr().String(),
|
sftpSession.LocalAddr().String(),
|
||||||
"sftp",
|
"sftp",
|
||||||
)
|
)
|
||||||
session.Login(sessionInfo, sess)
|
session.Login(sessionInfo, sftpSession)
|
||||||
defer session.LogOut(sessionInfo.ClientId)
|
defer session.LogOut(sessionInfo.ClientId)
|
||||||
|
|
||||||
debugStream := io.Discard
|
debugStream := io.Discard
|
||||||
serverOptions := []sftp.ServerOption{
|
serverOptions := []sftp.ServerOption{
|
||||||
sftp.WithDebug(debugStream),
|
sftp.WithDebug(debugStream),
|
||||||
}
|
}
|
||||||
|
// activity for sftp means that the server is sending data to the client.
|
||||||
|
// In contrast to activity detection for ssh we use activity detection based on
|
||||||
|
// serveractivity. This approach ensures that long downloads are not interrupted
|
||||||
|
// by a timeout and are allowed to finish.
|
||||||
|
activityDetector := NewSftpActivityDetector(sftpSession)
|
||||||
server, err := sftp.NewServer(
|
server, err := sftp.NewServer(
|
||||||
sess,
|
activityDetector,
|
||||||
serverOptions...,
|
serverOptions...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -60,22 +65,6 @@ func SftpHandler(sess ssh.Session) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type UserActivityDetector struct {
|
|
||||||
session io.ReadWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (user UserActivityDetector) Read(p []byte) (int, error) {
|
|
||||||
n, err := user.session.Read(p)
|
|
||||||
if err == nil && n > 0 {
|
|
||||||
session.UserActivityDetected()
|
|
||||||
}
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (user UserActivityDetector) Write(p []byte) (int, error) {
|
|
||||||
return user.session.Write(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func sshServer(hostKeyFile string, shellCommand string,
|
func sshServer(hostKeyFile string, shellCommand string,
|
||||||
authorizedPublicKeys *AuthorizedPublicKeys) *ssh.Server {
|
authorizedPublicKeys *AuthorizedPublicKeys) *ssh.Server {
|
||||||
ssh.Handle(func(sshSession ssh.Session) {
|
ssh.Handle(func(sshSession ssh.Session) {
|
||||||
@ -89,10 +78,10 @@ func sshServer(hostKeyFile string, shellCommand string,
|
|||||||
sshSession.LocalAddr().String(), "ssh",
|
sshSession.LocalAddr().String(), "ssh",
|
||||||
)
|
)
|
||||||
session.Login(sessionInfo, sshSession)
|
session.Login(sessionInfo, sshSession)
|
||||||
activityDetector := UserActivityDetector{
|
// For SSH we detect activity when there are writes to the shell that was spanwedn.
|
||||||
session: sshSession,
|
// This means user input.
|
||||||
}
|
activityDetector := NewWriteDetector(process.Pipe())
|
||||||
iowrappers.SynchronizeStreams("shell -- ssh", process.Pipe(), activityDetector)
|
iowrappers.SynchronizeStreams("shell -- ssh", activityDetector, sshSession)
|
||||||
session.LogOut(sessionInfo.ClientId)
|
session.LogOut(sessionInfo.ClientId)
|
||||||
// will cause addition goroutines to remmain alive when the SSH
|
// will cause addition goroutines to remmain alive when the SSH
|
||||||
// session is killed. For now acceptable since the agent is a short-lived
|
// session is killed. For now acceptable since the agent is a short-lived
|
||||||
|
Loading…
Reference in New Issue
Block a user