converge/pkg/comms/gobchannel.go
Erik Brakkee d17ad9bc3e Added pprof to convergeserver and optionally to
the agent if PPROF_PORT is set.

Fixed issue with converge server not cleaning up goroutines because of blocking channel. Made sure to create channels with > 1 size everywhere it can be done. The blocking behavior of a default channel size is mostly in the way.

Known issue: Killing the SSH client will lead to the server side process not being terminated and some goroutines still running in the agent. This would require additional investigation to solve. The remote processes are still being cleaned up ok (at least on linux) when the agent exits.

This should not be an issue at all since the agent is a short-lived process and when running in a containerized environment with containers running on demand the cleanup will definitely work.
2024-07-28 11:48:31 +02:00

95 lines
2.0 KiB
Go

package comms
import (
"encoding/gob"
"fmt"
"io"
"log"
"time"
)
type GOBChannel struct {
// can be any connection, including the ssh connnection before it is
// passed on to SSH during initialization of converge to agent communication
Peer io.ReadWriter
Encoder *gob.Encoder
Decoder *gob.Decoder
}
func NewGOBChannel(conn io.ReadWriter) GOBChannel {
return GOBChannel{
Peer: conn,
Encoder: gob.NewEncoder(conn),
Decoder: gob.NewDecoder(conn),
}
}
func Send(channel GOBChannel, object any) error {
err := channel.Encoder.Encode(object)
if err != nil {
log.Printf("Encoding error %v", err)
}
return err
}
func Receive[T any](channel GOBChannel) (T, error) {
target := *new(T)
err := channel.Decoder.Decode(&target)
return target, err
}
// Asynchronous send and receive on a single connection is guaranteed to preserver ordering of
// messages. We use asynchronous to void blocking indefinitely or depending on network timeouts.
func SendAsync[T any](channel GOBChannel, obj T, done chan<- bool, errors chan<- error) {
go func() {
err := Send(channel, obj)
if err != nil {
errors <- err
} else {
done <- true
}
}()
}
func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error) {
go func() {
value, err := Receive[T](channel)
if err != nil {
errors <- err
} else {
result <- value
}
}()
}
func SendWithTimeout[T any](channel GOBChannel, obj T) error {
done := make(chan bool, 10)
errors := make(chan error, 10)
SendAsync(channel, obj, done, errors)
select {
case <-time.After(MESSAGE_TIMEOUT):
return fmt.Errorf("Timeout in SwndWithTimout")
case err := <-errors:
return err
case <-done:
return nil
}
}
func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) {
result := make(chan T, 10)
errors := make(chan error, 10)
ReceiveAsync(channel, result, errors)
select {
case <-time.After(MESSAGE_TIMEOUT):
return *new(T), fmt.Errorf("Timeout in ReceiveWithTimout")
case err := <-errors:
return *new(T), err
case value := <-result:
return value, nil
}
}