From 3867b0432d2ea31febb13a435553bbd1eaeb6308 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Mon, 19 Aug 2024 22:31:02 +0200 Subject: [PATCH] a lot of progress in setting up tests for the communication. Wrote ChannelReadWriter that simulates a connection inmemory. This is used by the agentserver test for testing the initialization. The first test is already working. --- pkg/comms/agentserver_test.go | 56 ++++++++++ pkg/support/iowrappers/bitpipe.go | 19 ++++ .../iowrappers/channelreadwritecloser.go | 64 +++++++++++ .../iowrappers/channelreadwriter_test.go | 105 ++++++++++++++++++ pkg/support/iowrappers/inmemoryconnection.go | 32 ++++++ pkg/support/iowrappers/io.go | 24 ---- pkg/support/iowrappers/readwriteaddrcloser.go | 12 ++ .../iowrappers/readwriteaddrcombiner.go | 37 ++++++ pkg/support/iowrappers/readwritercombiner.go | 1 + .../{sync.go => synchronizestreams.go} | 0 10 files changed, 326 insertions(+), 24 deletions(-) create mode 100644 pkg/comms/agentserver_test.go create mode 100644 pkg/support/iowrappers/bitpipe.go create mode 100644 pkg/support/iowrappers/channelreadwritecloser.go create mode 100644 pkg/support/iowrappers/channelreadwriter_test.go create mode 100644 pkg/support/iowrappers/inmemoryconnection.go delete mode 100644 pkg/support/iowrappers/io.go create mode 100644 pkg/support/iowrappers/readwriteaddrcloser.go create mode 100644 pkg/support/iowrappers/readwriteaddrcombiner.go create mode 100644 pkg/support/iowrappers/readwritercombiner.go rename pkg/support/iowrappers/{sync.go => synchronizestreams.go} (100%) diff --git a/pkg/comms/agentserver_test.go b/pkg/comms/agentserver_test.go new file mode 100644 index 0000000..83b72f0 --- /dev/null +++ b/pkg/comms/agentserver_test.go @@ -0,0 +1,56 @@ +package comms + +import ( + "context" + "git.wamblee.org/converge/pkg/support/iowrappers" + "github.com/stretchr/testify/suite" + "log" + "sync" + "testing" +) + +type AgentServerTestSuite struct { + suite.Suite +} + +func (suite *AgentServerTestSuite) SetupTest() { +} + +func (suite *AgentServerTestSuite) TearDownTest() { +} + +func TestAgentServerTestSuite(t *testing.T) { + suite.Run(t, &AgentServerTestSuite{}) +} + +func (suite *AgentServerTestSuite) TestNewCommChannel() { + bitpipe := iowrappers.NewInmemoryConnection(context.Background(), "inmemory") + agentConnection := bitpipe.Front() + serverConnection := bitpipe.Back() + requires := suite.Require() + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + log.Println("Agent initializing") + commChannel, err := NewCommChannel(Agent, agentConnection) + requires.Nil(err) + protocolVersion := ProtocolVersion{Version: 10} + err = SendWithTimeout[ProtocolVersion](commChannel.SideChannel, protocolVersion) + requires.Nil(err) + log.Printf("Sent one message %v", protocolVersion) + wg.Done() + }() + + go func() { + log.Println("Server initializing") + commChannel, err := NewCommChannel(ConvergeServer, serverConnection) + requires.Nil(err) + protocolVersion, err := ReceiveWithTimeout[ProtocolVersion](commChannel.SideChannel) + requires.Nil(err) + log.Printf("Received one message %v", protocolVersion) + wg.Done() + }() + wg.Wait() + +} diff --git a/pkg/support/iowrappers/bitpipe.go b/pkg/support/iowrappers/bitpipe.go new file mode 100644 index 0000000..da38201 --- /dev/null +++ b/pkg/support/iowrappers/bitpipe.go @@ -0,0 +1,19 @@ +package iowrappers + +type dummyRemoteAddr string + +func (r dummyRemoteAddr) Network() string { + return string(r) +} +func (r dummyRemoteAddr) String() string { + return string(r) +} + +// A bitpipe is mainly a test utility. It uses byte buffers to perform bi-directional +// communication. The test code can read and write from one side of the bitpipe and the +// code under test reads the other side. + +type BitPipe interface { + Front() ReadWriteAddrCloser + Back() ReadWriteAddrCloser +} diff --git a/pkg/support/iowrappers/channelreadwritecloser.go b/pkg/support/iowrappers/channelreadwritecloser.go new file mode 100644 index 0000000..e271702 --- /dev/null +++ b/pkg/support/iowrappers/channelreadwritecloser.go @@ -0,0 +1,64 @@ +package iowrappers + +import ( + "context" + "io" + "log" +) + +type ChannelReadWriter struct { + ctx context.Context + receiver <-chan []byte + // bytes that were read and that did not fit + readBuf []byte + sender chan<- []byte + closed bool +} + +func NewChannelReadWriter(ctx context.Context, receiver <-chan []byte, + sender chan<- []byte) *ChannelReadWriter { + return &ChannelReadWriter{ + ctx: ctx, + receiver: receiver, + sender: sender, + closed: false, + } +} + +func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) { + nread := copy(p, rw.readBuf) + if nread > 0 { + rw.readBuf = rw.readBuf[nread:] + return nread, nil + } + + select { + case <-rw.ctx.Done(): + log.Println("Context was canceled") + return 0, io.EOF + case data, ok := <-rw.receiver: + if !ok { + log.Println("Channel closed") + return 0, io.EOF + } + nread = copy(p, data) + rw.readBuf = data[nread:] + return nread, nil + } + +} +func (rw *ChannelReadWriter) Write(p []byte) (n int, err error) { + select { + case <-rw.ctx.Done(): + return 0, io.EOF + case rw.sender <- p: + } + return len(p), nil +} +func (rw *ChannelReadWriter) Close() error { + if !rw.closed { + close(rw.sender) + rw.closed = true + } + return nil +} diff --git a/pkg/support/iowrappers/channelreadwriter_test.go b/pkg/support/iowrappers/channelreadwriter_test.go new file mode 100644 index 0000000..9cf2585 --- /dev/null +++ b/pkg/support/iowrappers/channelreadwriter_test.go @@ -0,0 +1,105 @@ +package iowrappers + +import ( + "context" + "github.com/stretchr/testify/suite" + "log" + "sync" + "testing" + "time" +) + +type ChannelReadWriterTestSuite struct { + suite.Suite + + ctx context.Context + cancelFunc context.CancelFunc + toChannel chan<- []byte + fromChannel <-chan []byte + conn *ChannelReadWriter +} + +func TestChannelReadWriterSuite(t *testing.T) { + suite.Run(t, &ChannelReadWriterTestSuite{}) +} + +func (suite *ChannelReadWriterTestSuite) createChannel() { + toChannel := make(chan []byte) + fromChannel := make(chan []byte) + suite.toChannel = toChannel + suite.fromChannel = fromChannel + ctx, cancelFunc := context.WithCancel(context.Background()) + ctx, timeoutCancelFunc := context.WithTimeout(ctx, 10*time.Second) + suite.ctx = ctx + suite.cancelFunc = func() { + timeoutCancelFunc() + cancelFunc() + } + suite.cancelFunc = cancelFunc + suite.conn = NewChannelReadWriter(ctx, toChannel, fromChannel) +} + +func (suite *ChannelReadWriterTestSuite) SetupTest() { + suite.createChannel() +} + +func (suite *ChannelReadWriterTestSuite) TearDownTest() { + suite.cancelFunc() +} + +type TestFunc func() any + +func runAndWait(functions ...TestFunc) []any { + wg := sync.WaitGroup{} + wg.Add(len(functions)) + res := make([]any, len(functions)) + for i, function := range functions { + go func() { + res[i] = function() + wg.Done() + }() + } + wg.Wait() + return res +} + +func (suite *ChannelReadWriterTestSuite) Test_SlicesLargeEnough() { + requires := suite.Require() + data := []byte("hello") + + runAndWait( + func() any { + suite.toChannel <- data + log.Println("data sent") + return nil + }, + func() any { + buf := make([]byte, len(data)*2) + n, err := suite.conn.Read(buf) + requires.Nil(err) + requires.Equal(n, len(data)) + requires.Equal(data, buf[:n]) + return nil + }, + ) +} + +func (suite *ChannelReadWriterTestSuite) Test_SliceTooSmallFullReadInTwoParts() { + suite.FailNow("todo") +} + +func (suite *ChannelReadWriterTestSuite) Test_SliceTooSmallFullREadInManyParts() { + suite.FailNow("todo") +} + +func (suite *ChannelReadWriterTestSuite) Test_Close() { + suite.FailNow("todo") +} + +func (suite *ChannelReadWriterTestSuite) Test_CloseTwice() { + suite.FailNow("todo") +} + +func (suite *ChannelReadWriterTestSuite) Test_ContextCanceled() { + suite.FailNow("todo") +} diff --git a/pkg/support/iowrappers/inmemoryconnection.go b/pkg/support/iowrappers/inmemoryconnection.go new file mode 100644 index 0000000..011f70b --- /dev/null +++ b/pkg/support/iowrappers/inmemoryconnection.go @@ -0,0 +1,32 @@ +package iowrappers + +import "context" + +type InmemoryConnection struct { + ctx context.Context + frontToBack chan ([]byte) + backToFront chan ([]byte) + addr string +} + +func NewInmemoryConnection(ctx context.Context, addr string) *InmemoryConnection { + pipe := InmemoryConnection{ + ctx: ctx, + frontToBack: make(chan []byte), + backToFront: make(chan []byte), + addr: addr, + } + return &pipe +} + +func (bitpipe *InmemoryConnection) Front() *ChannelReadWriter { + return pipe(bitpipe.ctx, bitpipe.backToFront, bitpipe.frontToBack, bitpipe.addr) +} + +func (bitpipe *InmemoryConnection) Back() *ChannelReadWriter { + return pipe(bitpipe.ctx, bitpipe.frontToBack, bitpipe.backToFront, bitpipe.addr) +} + +func pipe(ctx context.Context, receiveBuffer <-chan []byte, sendBuffer chan<- []byte, remoteAddr string) *ChannelReadWriter { + return NewChannelReadWriter(ctx, receiveBuffer, sendBuffer) +} diff --git a/pkg/support/iowrappers/io.go b/pkg/support/iowrappers/io.go deleted file mode 100644 index 67620f0..0000000 --- a/pkg/support/iowrappers/io.go +++ /dev/null @@ -1,24 +0,0 @@ -package iowrappers - -import ( - "io" - "net" -) - -type ReadWriteAddrCloser interface { - io.ReadWriteCloser - - RemoteAddr() net.Addr -} - -type ReadWriterCombiner struct { - io.Reader - io.Writer -} - -func (rw *ReadWriterCombiner) Read(p []byte) (n int, err error) { - return rw.Reader.Read(p) -} -func (rw *ReadWriterCombiner) Write(p []byte) (n int, err error) { - return rw.Writer.Write(p) -} diff --git a/pkg/support/iowrappers/readwriteaddrcloser.go b/pkg/support/iowrappers/readwriteaddrcloser.go new file mode 100644 index 0000000..a215521 --- /dev/null +++ b/pkg/support/iowrappers/readwriteaddrcloser.go @@ -0,0 +1,12 @@ +package iowrappers + +import ( + "io" + "net" +) + +type ReadWriteAddrCloser interface { + io.ReadWriteCloser + + RemoteAddr() net.Addr +} diff --git a/pkg/support/iowrappers/readwriteaddrcombiner.go b/pkg/support/iowrappers/readwriteaddrcombiner.go new file mode 100644 index 0000000..16c755f --- /dev/null +++ b/pkg/support/iowrappers/readwriteaddrcombiner.go @@ -0,0 +1,37 @@ +package iowrappers + +import ( + "io" + "net" +) + +type ReadWriterAddrCombiner struct { + reader io.Reader + writer io.Writer + closer func() error + remoteAddr net.Addr +} + +func (rw *ReadWriterAddrCombiner) Read(p []byte) (n int, err error) { + return rw.reader.Read(p) +} + +func NewReadWriterAddrCombiner(reader io.Reader, writer io.Writer, + closer func() error, remoteAddr net.Addr) *ReadWriterAddrCombiner { + return &ReadWriterAddrCombiner{ + reader: reader, + writer: writer, + closer: closer, + remoteAddr: remoteAddr, + } +} + +func (rw *ReadWriterAddrCombiner) Write(p []byte) (n int, err error) { + return rw.writer.Write(p) +} +func (rw *ReadWriterAddrCombiner) Close() error { + return rw.closer() +} +func (rw *ReadWriterAddrCombiner) RemoteAddr() net.Addr { + return rw.remoteAddr +} diff --git a/pkg/support/iowrappers/readwritercombiner.go b/pkg/support/iowrappers/readwritercombiner.go new file mode 100644 index 0000000..34c2edb --- /dev/null +++ b/pkg/support/iowrappers/readwritercombiner.go @@ -0,0 +1 @@ +package iowrappers diff --git a/pkg/support/iowrappers/sync.go b/pkg/support/iowrappers/synchronizestreams.go similarity index 100% rename from pkg/support/iowrappers/sync.go rename to pkg/support/iowrappers/synchronizestreams.go