Added pprof to convergeserver and optionally to

the agent if PPROF_PORT is set.

Fixed issue with converge server not cleaning up goroutines because of blocking channel. Made sure to create channels with > 1 size everywhere it can be done. The blocking behavior of a default channel size is mostly in the way.

Known issue: Killing the SSH client will lead to the server side process not being terminated and some goroutines still running in the agent. This would require additional investigation to solve. The remote processes are still being cleaned up ok (at least on linux) when the agent exits.

This should not be an issue at all since the agent is a short-lived process and when running in a containerized environment with containers running on demand the cleanup will definitely work.
This commit is contained in:
Erik Brakkee 2024-07-28 11:48:31 +02:00
parent 788050df32
commit e01a2bc729
8 changed files with 30 additions and 46 deletions

View File

@ -26,6 +26,7 @@ import (
"time" "time"
_ "embed" _ "embed"
_ "net/http/pprof"
) )
//go:embed hostkey.pem //go:embed hostkey.pem
@ -70,6 +71,11 @@ func sshServer(hostKeyFile string, shellCommand string,
agent.Login(uid, s) agent.Login(uid, s)
iowrappers.SynchronizeStreams(process.Pipe(), s) iowrappers.SynchronizeStreams(process.Pipe(), s)
agent.LogOut(uid) agent.LogOut(uid)
// will cause addition goroutines to remmain alive when the SSH
// session is killed. For now acceptable since the agent is a short-lived
// process. Using Kill() here will create defunct processes and in normal
// circummstances Wait() will be the best because the process will be shutting
// down automatically becuase it has lost its terminal.
process.Wait() process.Wait()
}) })
@ -186,6 +192,14 @@ func parseDuration(args []string, val string) (time.Duration, []string) {
func main() { func main() {
pprofPort, ok := os.LookupEnv("PPROF_PORT")
if ok {
log.Printf("Enablilng pprof on localhost:%s", pprofPort)
go func() {
log.Println(http.ListenAndServe("localhost:"+pprofPort, nil))
}()
}
id := "" id := ""
authorizedKeysFile := ".authorized_keys" authorizedKeysFile := ".authorized_keys"
advanceWarningTime := 5 * time.Minute advanceWarningTime := 5 * time.Minute

View File

@ -13,6 +13,8 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
_ "net/http/pprof"
) )
func parsePublicId(path string) (publicId string, _ error) { func parsePublicId(path string) (publicId string, _ error) {

View File

@ -54,5 +54,5 @@ func main() {
} }
// Block main goroutine forever // Block main goroutine forever
<-make(chan struct{}) <-make(chan struct{}, 10)
} }

View File

@ -26,7 +26,6 @@ import (
// When default is used, channel will block always and thereby // When default is used, channel will block always and thereby
// effectively serializing everything. // effectively serializing everything.
// //
// make(chan func())
// global configuration // global configuration

View File

@ -25,6 +25,7 @@ const (
func NewCommChannel(role Role, wsConn io.ReadWriteCloser) (CommChannel, error) { func NewCommChannel(role Role, wsConn io.ReadWriteCloser) (CommChannel, error) {
var commChannel CommChannel var commChannel CommChannel
switch role { switch role {
case Agent: case Agent:
listener, err := yamux.Server(wsConn, nil) listener, err := yamux.Server(wsConn, nil)
@ -220,9 +221,9 @@ func CheckProtocolVersion(role Role, channel GOBChannel) error {
func CheckProtocolVersionOld(role Role, conn io.ReadWriter) error { func CheckProtocolVersionOld(role Role, conn io.ReadWriter) error {
channel := NewGOBChannel(conn) channel := NewGOBChannel(conn)
sends := make(chan bool) sends := make(chan bool, 10)
receives := make(chan ProtocolVersion) receives := make(chan ProtocolVersion, 10)
errors := make(chan error) errors := make(chan error, 10)
SendAsync(channel, ProtocolVersion{Version: PROTOCOL_VERSION}, sends, errors) SendAsync(channel, ProtocolVersion{Version: PROTOCOL_VERSION}, sends, errors)
ReceiveAsync(channel, receives, errors) ReceiveAsync(channel, receives, errors)

View File

@ -64,8 +64,8 @@ func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error)
} }
func SendWithTimeout[T any](channel GOBChannel, obj T) error { func SendWithTimeout[T any](channel GOBChannel, obj T) error {
done := make(chan bool) done := make(chan bool, 10)
errors := make(chan error) errors := make(chan error, 10)
SendAsync(channel, obj, done, errors) SendAsync(channel, obj, done, errors)
select { select {
@ -79,8 +79,8 @@ func SendWithTimeout[T any](channel GOBChannel, obj T) error {
} }
func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) { func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) {
result := make(chan T) result := make(chan T, 10)
errors := make(chan error) errors := make(chan error, 10)
ReceiveAsync(channel, result, errors) ReceiveAsync(channel, result, errors)
select { select {

View File

@ -6,7 +6,7 @@ import (
) )
func SynchronizeStreams(stream1, stream2 io.ReadWriter) { func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
waitChannel := make(chan bool) waitChannel := make(chan bool, 2)
go func() { go func() {
defer func() { defer func() {
@ -14,7 +14,7 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
}() }()
_, err := io.Copy(stream1, stream2) _, err := io.Copy(stream1, stream2)
if err != nil { if err != nil {
log.Printf("sync streams error(1) %v\n", err) log.Printf("SynchronizeStreamms: error(1) %v\n", err)
} }
}() }()
@ -23,7 +23,9 @@ func SynchronizeStreams(stream1, stream2 io.ReadWriter) {
waitChannel <- true waitChannel <- true
}() }()
_, err := io.Copy(stream2, stream1) _, err := io.Copy(stream2, stream1)
log.Printf("sync streams error(2) %v\n", err) if err != nil {
log.Printf("SynchronizeStreams: error(2) %v\n", err)
}
}() }()
<-waitChannel <-waitChannel

View File

@ -1,34 +0,0 @@
package websocketutil
import (
"log"
"net"
)
type WebSocketListener struct {
connections chan net.Conn
}
func NewWebSocketListener() WebSocketListener {
return WebSocketListener{
connections: make(chan net.Conn),
}
}
func (listener WebSocketListener) Accept() (net.Conn, error) {
conn := <-listener.connections
log.Printf("Got client connection: %v\n", conn)
return conn, nil
}
func (listener *WebSocketListener) NewConnection(conn net.Conn) {
listener.connections <- conn
}
func (listener WebSocketListener) Close() error {
return nil
}
func (listener WebSocketListener) Addr() net.Addr {
return WebSocketAddr("rendez-vous")
}