From d17ad9bc3e11f896ac72bdd37745e14ac2de7820 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 28 Jul 2024 11:48:31 +0200 Subject: [PATCH] Added pprof to convergeserver and optionally to the agent if PPROF_PORT is set. Fixed issue with converge server not cleaning up goroutines because of blocking channel. Made sure to create channels with > 1 size everywhere it can be done. The blocking behavior of a default channel size is mostly in the way. Known issue: Killing the SSH client will lead to the server side process not being terminated and some goroutines still running in the agent. This would require additional investigation to solve. The remote processes are still being cleaned up ok (at least on linux) when the agent exits. This should not be an issue at all since the agent is a short-lived process and when running in a containerized environment with containers running on demand the cleanup will definitely work. --- cmd/agent/agent.go | 14 ++++++++++++++ cmd/converge/converge.go | 2 ++ cmd/fsnotifytest/monitor.go | 2 +- pkg/agent/session.go | 1 - pkg/comms/agentserver.go | 7 ++++--- pkg/comms/gobchannel.go | 8 ++++---- pkg/iowrappers/sync.go | 8 +++++--- pkg/websocketutil/listener.go | 34 ---------------------------------- 8 files changed, 30 insertions(+), 46 deletions(-) delete mode 100644 pkg/websocketutil/listener.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 4887672..6da5291 100755 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -26,6 +26,7 @@ import ( "time" _ "embed" + _ "net/http/pprof" ) //go:embed hostkey.pem @@ -70,6 +71,11 @@ func sshServer(hostKeyFile string, shellCommand string, agent.Login(uid, s) iowrappers.SynchronizeStreams(process.Pipe(), s) agent.LogOut(uid) + // will cause addition goroutines to remmain alive when the SSH + // session is killed. For now acceptable since the agent is a short-lived + // process. Using Kill() here will create defunct processes and in normal + // circummstances Wait() will be the best because the process will be shutting + // down automatically becuase it has lost its terminal. process.Wait() }) @@ -186,6 +192,14 @@ func parseDuration(args []string, val string) (time.Duration, []string) { func main() { + pprofPort, ok := os.LookupEnv("PPROF_PORT") + if ok { + log.Printf("Enablilng pprof on localhost:%s", pprofPort) + go func() { + log.Println(http.ListenAndServe("localhost:"+pprofPort, nil)) + }() + } + id := "" authorizedKeysFile := ".authorized_keys" advanceWarningTime := 5 * time.Minute diff --git a/cmd/converge/converge.go b/cmd/converge/converge.go index 8cb6910..b68be8c 100644 --- a/cmd/converge/converge.go +++ b/cmd/converge/converge.go @@ -13,6 +13,8 @@ import ( "regexp" "strconv" "strings" + + _ "net/http/pprof" ) func parsePublicId(path string) (publicId string, _ error) { diff --git a/cmd/fsnotifytest/monitor.go b/cmd/fsnotifytest/monitor.go index 7d1622b..9b450b4 100644 --- a/cmd/fsnotifytest/monitor.go +++ b/cmd/fsnotifytest/monitor.go @@ -54,5 +54,5 @@ func main() { } // Block main goroutine forever - <-make(chan struct{}) + <-make(chan struct{}, 10) } diff --git a/pkg/agent/session.go b/pkg/agent/session.go index b39db51..10774c2 100644 --- a/pkg/agent/session.go +++ b/pkg/agent/session.go @@ -26,7 +26,6 @@ import ( // When default is used, channel will block always and thereby // effectively serializing everything. // -// make(chan func()) // global configuration diff --git a/pkg/comms/agentserver.go b/pkg/comms/agentserver.go index a73e164..a1ebca8 100644 --- a/pkg/comms/agentserver.go +++ b/pkg/comms/agentserver.go @@ -25,6 +25,7 @@ const ( func NewCommChannel(role Role, wsConn io.ReadWriteCloser) (CommChannel, error) { var commChannel CommChannel + switch role { case Agent: listener, err := yamux.Server(wsConn, nil) @@ -220,9 +221,9 @@ func CheckProtocolVersion(role Role, channel GOBChannel) error { func CheckProtocolVersionOld(role Role, conn io.ReadWriter) error { channel := NewGOBChannel(conn) - sends := make(chan bool) - receives := make(chan ProtocolVersion) - errors := make(chan error) + sends := make(chan bool, 10) + receives := make(chan ProtocolVersion, 10) + errors := make(chan error, 10) SendAsync(channel, ProtocolVersion{Version: PROTOCOL_VERSION}, sends, errors) ReceiveAsync(channel, receives, errors) diff --git a/pkg/comms/gobchannel.go b/pkg/comms/gobchannel.go index 854112c..601126d 100644 --- a/pkg/comms/gobchannel.go +++ b/pkg/comms/gobchannel.go @@ -64,8 +64,8 @@ func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error) } func SendWithTimeout[T any](channel GOBChannel, obj T) error { - done := make(chan bool) - errors := make(chan error) + done := make(chan bool, 10) + errors := make(chan error, 10) SendAsync(channel, obj, done, errors) select { @@ -79,8 +79,8 @@ func SendWithTimeout[T any](channel GOBChannel, obj T) error { } func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) { - result := make(chan T) - errors := make(chan error) + result := make(chan T, 10) + errors := make(chan error, 10) ReceiveAsync(channel, result, errors) select { diff --git a/pkg/iowrappers/sync.go b/pkg/iowrappers/sync.go index 4f3af2c..7d7667e 100644 --- a/pkg/iowrappers/sync.go +++ b/pkg/iowrappers/sync.go @@ -6,7 +6,7 @@ import ( ) func SynchronizeStreams(stream1, stream2 io.ReadWriter) { - waitChannel := make(chan bool) + waitChannel := make(chan bool, 2) go func() { defer func() { @@ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { }() _, err := io.Copy(stream1, stream2) if err != nil { - log.Printf("sync streams error(1) %v\n", err) + log.Printf("SynchronizeStreamms: error(1) %v\n", err) } }() @@ -23,7 +23,9 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) { waitChannel <- true }() _, err := io.Copy(stream2, stream1) - log.Printf("sync streams error(2) %v\n", err) + if err != nil { + log.Printf("SynchronizeStreams: error(2) %v\n", err) + } }() <-waitChannel diff --git a/pkg/websocketutil/listener.go b/pkg/websocketutil/listener.go deleted file mode 100644 index ef82ea7..0000000 --- a/pkg/websocketutil/listener.go +++ /dev/null @@ -1,34 +0,0 @@ -package websocketutil - -import ( - "log" - "net" -) - -type WebSocketListener struct { - connections chan net.Conn -} - -func NewWebSocketListener() WebSocketListener { - return WebSocketListener{ - connections: make(chan net.Conn), - } -} - -func (listener WebSocketListener) Accept() (net.Conn, error) { - conn := <-listener.connections - log.Printf("Got client connection: %v\n", conn) - return conn, nil -} - -func (listener *WebSocketListener) NewConnection(conn net.Conn) { - listener.connections <- conn -} - -func (listener WebSocketListener) Close() error { - return nil -} - -func (listener WebSocketListener) Addr() net.Addr { - return WebSocketAddr("rendez-vous") -}