fixed issue with ChannelReadWriter for unbuffered channels.

Was a concurrency issue int he Write method that retained the
slice p that was passed in, making concurrency issues much more likely
with unbuffered channels.
This commit is contained in:
Erik Brakkee 2024-08-21 16:32:37 +02:00
parent cfed204af5
commit f82f656b50
4 changed files with 19 additions and 13 deletions

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"git.wamblee.org/converge/pkg/testsupport" "git.wamblee.org/converge/pkg/testsupport"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"io"
"log" "log"
"net"
"net/http" "net/http"
"testing" "testing"
"time" "time"
@ -18,8 +18,8 @@ type AgentServerTestSuite struct {
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
pprofServer *http.Server pprofServer *http.Server
agentConnection net.Conn agentConnection io.ReadWriteCloser
serverConnection net.Conn serverConnection io.ReadWriteCloser
} }
func (s *AgentServerTestSuite) SetupSuite() { func (s *AgentServerTestSuite) SetupSuite() {
@ -34,10 +34,14 @@ func (s *AgentServerTestSuite) SetupTest() {
ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second) ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
s.ctx = ctx s.ctx = ctx
s.cancelFunc = cancelFunc s.cancelFunc = cancelFunc
serverConnection, agentConnection := net.Pipe()
deadline := time.Now().Add(10 * time.Second) // Could have also used net.Pipe but net.Pipe uses synchronous communication
serverConnection.SetDeadline(deadline) // by default and the bitpipe implementation can become asynchronous when
agentConnection.SetDeadline(deadline) // a channels ize > 0 is passed in. Also the test utility respects the context
// so also deals with cancellation much better than net.Pipe.
bitpipe := testsupport.NewInmemoryConnection(s.ctx, "inmemory", 10)
agentConnection := bitpipe.Front()
serverConnection := bitpipe.Back()
s.serverConnection = serverConnection s.serverConnection = serverConnection
s.agentConnection = agentConnection s.agentConnection = agentConnection
} }

View File

@ -49,7 +49,7 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
case data, ok := <-rw.receiver: case data, ok := <-rw.receiver:
if !ok { if !ok {
return 0, errors.New("ladida") //io.EOF return 0, io.EOF
} }
nread = copy(p, data) nread = copy(p, data)
rw.readBuf = data[nread:] rw.readBuf = data[nread:]
@ -57,7 +57,9 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
} }
} }
func (rw *ChannelReadWriter) Write(p []byte) (n int, err error) { func (rw *ChannelReadWriter) Write(pIn []byte) (n int, err error) {
p := make([]byte, len(pIn), len(pIn))
copy(p, pIn)
rw.senderMutex.Lock() rw.senderMutex.Lock()
defer rw.senderMutex.Unlock() defer rw.senderMutex.Unlock()

View File

@ -9,12 +9,12 @@ type InmemoryConnection struct {
addr string addr string
} }
func NewInmemoryConnection(ctx context.Context, addr string) *InmemoryConnection { func NewInmemoryConnection(ctx context.Context, addr string, channelSize int) *InmemoryConnection {
pipe := InmemoryConnection{ pipe := InmemoryConnection{
ctx: ctx, ctx: ctx,
// TODO: somehow does not work with unbuffered channel and yamux // TODO: somehow does not work with unbuffered channel and yamux
frontToBack: make(chan []byte, 0), frontToBack: make(chan []byte, channelSize),
backToFront: make(chan []byte, 0), backToFront: make(chan []byte, channelSize),
addr: addr, addr: addr,
} }
return &pipe return &pipe

View File

@ -25,7 +25,7 @@ func (s *InMemoryTestSuite) createConnection() {
ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second) ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second)
s.ctx = ctx s.ctx = ctx
s.cancelFunc = cancelFunc s.cancelFunc = cancelFunc
s.pipe = NewInmemoryConnection(ctx, "inmemory") s.pipe = NewInmemoryConnection(ctx, "inmemory", 10)
} }
func (s *InMemoryTestSuite) SetupSuite() { func (s *InMemoryTestSuite) SetupSuite() {