package comms import ( "encoding/gob" "fmt" "io" "log" "time" ) type GOBChannel struct { // can be any connection, including the ssh connnection before it is // passed on to SSH during initialization of converge to agent communication Peer io.ReadWriter Encoder *gob.Encoder Decoder *gob.Decoder } func NewGOBChannel(conn io.ReadWriter) GOBChannel { return GOBChannel{ Peer: conn, Encoder: gob.NewEncoder(conn), Decoder: gob.NewDecoder(conn), } } func Send(channel GOBChannel, object any) error { err := channel.Encoder.Encode(object) if err != nil { log.Printf("Encoding error %v", err) } return err } func Receive[T any](channel GOBChannel) (T, error) { target := *new(T) err := channel.Decoder.Decode(&target) return target, err } // Asynchronous send and receive on a single connection is guaranteed to preserver ordering of // messages. We use asynchronous to void blocking indefinitely or depending on network timeouts. func SendAsync[T any](channel GOBChannel, obj T, done chan<- bool, errors chan<- error) { go func() { err := Send(channel, obj) if err != nil { errors <- err } else { done <- true } }() } func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error) { go func() { value, err := Receive[T](channel) if err != nil { errors <- err } else { result <- value } }() } func SendWithTimeout[T any](channel GOBChannel, obj T) error { done := make(chan bool) errors := make(chan error) SendAsync(channel, obj, done, errors) select { case <-time.After(MESSAGE_TIMEOUT): return fmt.Errorf("Timeout in SendWithTimout") case err := <-errors: return err case <-done: return nil } } func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) { result := make(chan T) errors := make(chan error) ReceiveAsync(channel, result, errors) select { case <-time.After(MESSAGE_TIMEOUT): return *new(T), fmt.Errorf("Timeout in ReceiveWithTimout") case err := <-errors: return *new(T), err case value := <-result: return value, nil } }