converge/pkg/comms/gobchannel.go
2024-08-13 21:33:29 +02:00

95 lines
2.0 KiB
Go

package comms
import (
"encoding/gob"
"fmt"
"io"
"log"
"time"
)
type GOBChannel struct {
// can be any connection, including the ssh connnection before it is
// passed on to SSH during initialization of converge to agent communication
Peer io.ReadWriter
Encoder *gob.Encoder
Decoder *gob.Decoder
}
func NewGOBChannel(conn io.ReadWriter) GOBChannel {
return GOBChannel{
Peer: conn,
Encoder: gob.NewEncoder(conn),
Decoder: gob.NewDecoder(conn),
}
}
func Send(channel GOBChannel, object any) error {
err := channel.Encoder.Encode(object)
if err != nil {
log.Printf("Encoding error %v", err)
}
return err
}
func Receive[T any](channel GOBChannel) (T, error) {
target := *new(T)
err := channel.Decoder.Decode(&target)
return target, err
}
// Asynchronous send and receive on a single connection is guaranteed to preserver ordering of
// messages. We use asynchronous to void blocking indefinitely or depending on network timeouts.
func SendAsync[T any](channel GOBChannel, obj T, done chan<- bool, errors chan<- error) {
go func() {
err := Send(channel, obj)
if err != nil {
errors <- err
} else {
done <- true
}
}()
}
func ReceiveAsync[T any](channel GOBChannel, result chan T, errors chan<- error) {
go func() {
value, err := Receive[T](channel)
if err != nil {
errors <- err
} else {
result <- value
}
}()
}
func SendWithTimeout[T any](channel GOBChannel, obj T) error {
done := make(chan bool)
errors := make(chan error)
SendAsync(channel, obj, done, errors)
select {
case <-time.After(MESSAGE_TIMEOUT):
return fmt.Errorf("Timeout in SwndWithTimout")
case err := <-errors:
return err
case <-done:
return nil
}
}
func ReceiveWithTimeout[T any](channel GOBChannel) (T, error) {
result := make(chan T)
errors := make(chan error)
ReceiveAsync(channel, result, errors)
select {
case <-time.After(MESSAGE_TIMEOUT):
return *new(T), fmt.Errorf("Timeout in ReceiveWithTimout")
case err := <-errors:
return *new(T), err
case value := <-result:
return value, nil
}
}