a lot of progress in setting up tests for the communication.

Wrote ChannelReadWriter that simulates a connection inmemory.
This is used by the agentserver test for testing the initialization. The
first test is already working.
This commit is contained in:
Erik Brakkee 2024-08-19 22:31:02 +02:00
parent b95314c964
commit 3867b0432d
10 changed files with 326 additions and 24 deletions

View File

@ -0,0 +1,56 @@
package comms
import (
"context"
"git.wamblee.org/converge/pkg/support/iowrappers"
"github.com/stretchr/testify/suite"
"log"
"sync"
"testing"
)
type AgentServerTestSuite struct {
suite.Suite
}
func (suite *AgentServerTestSuite) SetupTest() {
}
func (suite *AgentServerTestSuite) TearDownTest() {
}
func TestAgentServerTestSuite(t *testing.T) {
suite.Run(t, &AgentServerTestSuite{})
}
func (suite *AgentServerTestSuite) TestNewCommChannel() {
bitpipe := iowrappers.NewInmemoryConnection(context.Background(), "inmemory")
agentConnection := bitpipe.Front()
serverConnection := bitpipe.Back()
requires := suite.Require()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
log.Println("Agent initializing")
commChannel, err := NewCommChannel(Agent, agentConnection)
requires.Nil(err)
protocolVersion := ProtocolVersion{Version: 10}
err = SendWithTimeout[ProtocolVersion](commChannel.SideChannel, protocolVersion)
requires.Nil(err)
log.Printf("Sent one message %v", protocolVersion)
wg.Done()
}()
go func() {
log.Println("Server initializing")
commChannel, err := NewCommChannel(ConvergeServer, serverConnection)
requires.Nil(err)
protocolVersion, err := ReceiveWithTimeout[ProtocolVersion](commChannel.SideChannel)
requires.Nil(err)
log.Printf("Received one message %v", protocolVersion)
wg.Done()
}()
wg.Wait()
}

View File

@ -0,0 +1,19 @@
package iowrappers
type dummyRemoteAddr string
func (r dummyRemoteAddr) Network() string {
return string(r)
}
func (r dummyRemoteAddr) String() string {
return string(r)
}
// A bitpipe is mainly a test utility. It uses byte buffers to perform bi-directional
// communication. The test code can read and write from one side of the bitpipe and the
// code under test reads the other side.
type BitPipe interface {
Front() ReadWriteAddrCloser
Back() ReadWriteAddrCloser
}

View File

@ -0,0 +1,64 @@
package iowrappers
import (
"context"
"io"
"log"
)
type ChannelReadWriter struct {
ctx context.Context
receiver <-chan []byte
// bytes that were read and that did not fit
readBuf []byte
sender chan<- []byte
closed bool
}
func NewChannelReadWriter(ctx context.Context, receiver <-chan []byte,
sender chan<- []byte) *ChannelReadWriter {
return &ChannelReadWriter{
ctx: ctx,
receiver: receiver,
sender: sender,
closed: false,
}
}
func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
nread := copy(p, rw.readBuf)
if nread > 0 {
rw.readBuf = rw.readBuf[nread:]
return nread, nil
}
select {
case <-rw.ctx.Done():
log.Println("Context was canceled")
return 0, io.EOF
case data, ok := <-rw.receiver:
if !ok {
log.Println("Channel closed")
return 0, io.EOF
}
nread = copy(p, data)
rw.readBuf = data[nread:]
return nread, nil
}
}
func (rw *ChannelReadWriter) Write(p []byte) (n int, err error) {
select {
case <-rw.ctx.Done():
return 0, io.EOF
case rw.sender <- p:
}
return len(p), nil
}
func (rw *ChannelReadWriter) Close() error {
if !rw.closed {
close(rw.sender)
rw.closed = true
}
return nil
}

View File

@ -0,0 +1,105 @@
package iowrappers
import (
"context"
"github.com/stretchr/testify/suite"
"log"
"sync"
"testing"
"time"
)
type ChannelReadWriterTestSuite struct {
suite.Suite
ctx context.Context
cancelFunc context.CancelFunc
toChannel chan<- []byte
fromChannel <-chan []byte
conn *ChannelReadWriter
}
func TestChannelReadWriterSuite(t *testing.T) {
suite.Run(t, &ChannelReadWriterTestSuite{})
}
func (suite *ChannelReadWriterTestSuite) createChannel() {
toChannel := make(chan []byte)
fromChannel := make(chan []byte)
suite.toChannel = toChannel
suite.fromChannel = fromChannel
ctx, cancelFunc := context.WithCancel(context.Background())
ctx, timeoutCancelFunc := context.WithTimeout(ctx, 10*time.Second)
suite.ctx = ctx
suite.cancelFunc = func() {
timeoutCancelFunc()
cancelFunc()
}
suite.cancelFunc = cancelFunc
suite.conn = NewChannelReadWriter(ctx, toChannel, fromChannel)
}
func (suite *ChannelReadWriterTestSuite) SetupTest() {
suite.createChannel()
}
func (suite *ChannelReadWriterTestSuite) TearDownTest() {
suite.cancelFunc()
}
type TestFunc func() any
func runAndWait(functions ...TestFunc) []any {
wg := sync.WaitGroup{}
wg.Add(len(functions))
res := make([]any, len(functions))
for i, function := range functions {
go func() {
res[i] = function()
wg.Done()
}()
}
wg.Wait()
return res
}
func (suite *ChannelReadWriterTestSuite) Test_SlicesLargeEnough() {
requires := suite.Require()
data := []byte("hello")
runAndWait(
func() any {
suite.toChannel <- data
log.Println("data sent")
return nil
},
func() any {
buf := make([]byte, len(data)*2)
n, err := suite.conn.Read(buf)
requires.Nil(err)
requires.Equal(n, len(data))
requires.Equal(data, buf[:n])
return nil
},
)
}
func (suite *ChannelReadWriterTestSuite) Test_SliceTooSmallFullReadInTwoParts() {
suite.FailNow("todo")
}
func (suite *ChannelReadWriterTestSuite) Test_SliceTooSmallFullREadInManyParts() {
suite.FailNow("todo")
}
func (suite *ChannelReadWriterTestSuite) Test_Close() {
suite.FailNow("todo")
}
func (suite *ChannelReadWriterTestSuite) Test_CloseTwice() {
suite.FailNow("todo")
}
func (suite *ChannelReadWriterTestSuite) Test_ContextCanceled() {
suite.FailNow("todo")
}

View File

@ -0,0 +1,32 @@
package iowrappers
import "context"
type InmemoryConnection struct {
ctx context.Context
frontToBack chan ([]byte)
backToFront chan ([]byte)
addr string
}
func NewInmemoryConnection(ctx context.Context, addr string) *InmemoryConnection {
pipe := InmemoryConnection{
ctx: ctx,
frontToBack: make(chan []byte),
backToFront: make(chan []byte),
addr: addr,
}
return &pipe
}
func (bitpipe *InmemoryConnection) Front() *ChannelReadWriter {
return pipe(bitpipe.ctx, bitpipe.backToFront, bitpipe.frontToBack, bitpipe.addr)
}
func (bitpipe *InmemoryConnection) Back() *ChannelReadWriter {
return pipe(bitpipe.ctx, bitpipe.frontToBack, bitpipe.backToFront, bitpipe.addr)
}
func pipe(ctx context.Context, receiveBuffer <-chan []byte, sendBuffer chan<- []byte, remoteAddr string) *ChannelReadWriter {
return NewChannelReadWriter(ctx, receiveBuffer, sendBuffer)
}

View File

@ -1,24 +0,0 @@
package iowrappers
import (
"io"
"net"
)
type ReadWriteAddrCloser interface {
io.ReadWriteCloser
RemoteAddr() net.Addr
}
type ReadWriterCombiner struct {
io.Reader
io.Writer
}
func (rw *ReadWriterCombiner) Read(p []byte) (n int, err error) {
return rw.Reader.Read(p)
}
func (rw *ReadWriterCombiner) Write(p []byte) (n int, err error) {
return rw.Writer.Write(p)
}

View File

@ -0,0 +1,12 @@
package iowrappers
import (
"io"
"net"
)
type ReadWriteAddrCloser interface {
io.ReadWriteCloser
RemoteAddr() net.Addr
}

View File

@ -0,0 +1,37 @@
package iowrappers
import (
"io"
"net"
)
type ReadWriterAddrCombiner struct {
reader io.Reader
writer io.Writer
closer func() error
remoteAddr net.Addr
}
func (rw *ReadWriterAddrCombiner) Read(p []byte) (n int, err error) {
return rw.reader.Read(p)
}
func NewReadWriterAddrCombiner(reader io.Reader, writer io.Writer,
closer func() error, remoteAddr net.Addr) *ReadWriterAddrCombiner {
return &ReadWriterAddrCombiner{
reader: reader,
writer: writer,
closer: closer,
remoteAddr: remoteAddr,
}
}
func (rw *ReadWriterAddrCombiner) Write(p []byte) (n int, err error) {
return rw.writer.Write(p)
}
func (rw *ReadWriterAddrCombiner) Close() error {
return rw.closer()
}
func (rw *ReadWriterAddrCombiner) RemoteAddr() net.Addr {
return rw.remoteAddr
}

View File

@ -0,0 +1 @@
package iowrappers