From f82f656b50ee2f174bf8f1d6da02fbe61c593440 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Wed, 21 Aug 2024 16:32:37 +0200 Subject: [PATCH] fixed issue with ChannelReadWriter for unbuffered channels. Was a concurrency issue int he Write method that retained the slice p that was passed in, making concurrency issues much more likely with unbuffered channels. --- pkg/comms/agentserver_test.go | 18 +++++++++++------- pkg/testsupport/channelreadwriter.go | 6 ++++-- pkg/testsupport/inmemoryconnection.go | 6 +++--- pkg/testsupport/inmemoryconnection_test.go | 2 +- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/comms/agentserver_test.go b/pkg/comms/agentserver_test.go index 316f4a4..0e56d7c 100644 --- a/pkg/comms/agentserver_test.go +++ b/pkg/comms/agentserver_test.go @@ -4,8 +4,8 @@ import ( "context" "git.wamblee.org/converge/pkg/testsupport" "github.com/stretchr/testify/suite" + "io" "log" - "net" "net/http" "testing" "time" @@ -18,8 +18,8 @@ type AgentServerTestSuite struct { cancelFunc context.CancelFunc pprofServer *http.Server - agentConnection net.Conn - serverConnection net.Conn + agentConnection io.ReadWriteCloser + serverConnection io.ReadWriteCloser } func (s *AgentServerTestSuite) SetupSuite() { @@ -34,10 +34,14 @@ func (s *AgentServerTestSuite) SetupTest() { ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second) s.ctx = ctx s.cancelFunc = cancelFunc - serverConnection, agentConnection := net.Pipe() - deadline := time.Now().Add(10 * time.Second) - serverConnection.SetDeadline(deadline) - agentConnection.SetDeadline(deadline) + + // Could have also used net.Pipe but net.Pipe uses synchronous communication + // by default and the bitpipe implementation can become asynchronous when + // a channels ize > 0 is passed in. Also the test utility respects the context + // so also deals with cancellation much better than net.Pipe. + bitpipe := testsupport.NewInmemoryConnection(s.ctx, "inmemory", 10) + agentConnection := bitpipe.Front() + serverConnection := bitpipe.Back() s.serverConnection = serverConnection s.agentConnection = agentConnection } diff --git a/pkg/testsupport/channelreadwriter.go b/pkg/testsupport/channelreadwriter.go index 1ee07f6..f979f76 100644 --- a/pkg/testsupport/channelreadwriter.go +++ b/pkg/testsupport/channelreadwriter.go @@ -49,7 +49,7 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) { return 0, io.ErrClosedPipe case data, ok := <-rw.receiver: if !ok { - return 0, errors.New("ladida") //io.EOF + return 0, io.EOF } nread = copy(p, data) rw.readBuf = data[nread:] @@ -57,7 +57,9 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) { } } -func (rw *ChannelReadWriter) Write(p []byte) (n int, err error) { +func (rw *ChannelReadWriter) Write(pIn []byte) (n int, err error) { + p := make([]byte, len(pIn), len(pIn)) + copy(p, pIn) rw.senderMutex.Lock() defer rw.senderMutex.Unlock() diff --git a/pkg/testsupport/inmemoryconnection.go b/pkg/testsupport/inmemoryconnection.go index acf50a1..c445bb8 100644 --- a/pkg/testsupport/inmemoryconnection.go +++ b/pkg/testsupport/inmemoryconnection.go @@ -9,12 +9,12 @@ type InmemoryConnection struct { addr string } -func NewInmemoryConnection(ctx context.Context, addr string) *InmemoryConnection { +func NewInmemoryConnection(ctx context.Context, addr string, channelSize int) *InmemoryConnection { pipe := InmemoryConnection{ ctx: ctx, // TODO: somehow does not work with unbuffered channel and yamux - frontToBack: make(chan []byte, 0), - backToFront: make(chan []byte, 0), + frontToBack: make(chan []byte, channelSize), + backToFront: make(chan []byte, channelSize), addr: addr, } return &pipe diff --git a/pkg/testsupport/inmemoryconnection_test.go b/pkg/testsupport/inmemoryconnection_test.go index 90d3580..0eed707 100644 --- a/pkg/testsupport/inmemoryconnection_test.go +++ b/pkg/testsupport/inmemoryconnection_test.go @@ -25,7 +25,7 @@ func (s *InMemoryTestSuite) createConnection() { ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second) s.ctx = ctx s.cancelFunc = cancelFunc - s.pipe = NewInmemoryConnection(ctx, "inmemory") + s.pipe = NewInmemoryConnection(ctx, "inmemory", 10) } func (s *InMemoryTestSuite) SetupSuite() {