fixed issue with ChannelReadWriter for unbuffered channels.
Was a concurrency issue int he Write method that retained the slice p that was passed in, making concurrency issues much more likely with unbuffered channels.
This commit is contained in:
parent
6dc8f67570
commit
dca0772dc3
@ -4,8 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"git.wamblee.org/converge/pkg/testsupport"
|
"git.wamblee.org/converge/pkg/testsupport"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -18,8 +18,8 @@ type AgentServerTestSuite struct {
|
|||||||
cancelFunc context.CancelFunc
|
cancelFunc context.CancelFunc
|
||||||
pprofServer *http.Server
|
pprofServer *http.Server
|
||||||
|
|
||||||
agentConnection net.Conn
|
agentConnection io.ReadWriteCloser
|
||||||
serverConnection net.Conn
|
serverConnection io.ReadWriteCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AgentServerTestSuite) SetupSuite() {
|
func (s *AgentServerTestSuite) SetupSuite() {
|
||||||
@ -34,10 +34,14 @@ func (s *AgentServerTestSuite) SetupTest() {
|
|||||||
ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
|
ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
|
||||||
s.ctx = ctx
|
s.ctx = ctx
|
||||||
s.cancelFunc = cancelFunc
|
s.cancelFunc = cancelFunc
|
||||||
serverConnection, agentConnection := net.Pipe()
|
|
||||||
deadline := time.Now().Add(10 * time.Second)
|
// Could have also used net.Pipe but net.Pipe uses synchronous communication
|
||||||
serverConnection.SetDeadline(deadline)
|
// by default and the bitpipe implementation can become asynchronous when
|
||||||
agentConnection.SetDeadline(deadline)
|
// a channels ize > 0 is passed in. Also the test utility respects the context
|
||||||
|
// so also deals with cancellation much better than net.Pipe.
|
||||||
|
bitpipe := testsupport.NewInmemoryConnection(s.ctx, "inmemory", 10)
|
||||||
|
agentConnection := bitpipe.Front()
|
||||||
|
serverConnection := bitpipe.Back()
|
||||||
s.serverConnection = serverConnection
|
s.serverConnection = serverConnection
|
||||||
s.agentConnection = agentConnection
|
s.agentConnection = agentConnection
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
|
|||||||
return 0, io.ErrClosedPipe
|
return 0, io.ErrClosedPipe
|
||||||
case data, ok := <-rw.receiver:
|
case data, ok := <-rw.receiver:
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, errors.New("ladida") //io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
nread = copy(p, data)
|
nread = copy(p, data)
|
||||||
rw.readBuf = data[nread:]
|
rw.readBuf = data[nread:]
|
||||||
@ -57,7 +57,9 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rw *ChannelReadWriter) Write(p []byte) (n int, err error) {
|
func (rw *ChannelReadWriter) Write(pIn []byte) (n int, err error) {
|
||||||
|
p := make([]byte, len(pIn), len(pIn))
|
||||||
|
copy(p, pIn)
|
||||||
rw.senderMutex.Lock()
|
rw.senderMutex.Lock()
|
||||||
defer rw.senderMutex.Unlock()
|
defer rw.senderMutex.Unlock()
|
||||||
|
|
||||||
|
@ -9,12 +9,12 @@ type InmemoryConnection struct {
|
|||||||
addr string
|
addr string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInmemoryConnection(ctx context.Context, addr string) *InmemoryConnection {
|
func NewInmemoryConnection(ctx context.Context, addr string, channelSize int) *InmemoryConnection {
|
||||||
pipe := InmemoryConnection{
|
pipe := InmemoryConnection{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
// TODO: somehow does not work with unbuffered channel and yamux
|
// TODO: somehow does not work with unbuffered channel and yamux
|
||||||
frontToBack: make(chan []byte, 0),
|
frontToBack: make(chan []byte, channelSize),
|
||||||
backToFront: make(chan []byte, 0),
|
backToFront: make(chan []byte, channelSize),
|
||||||
addr: addr,
|
addr: addr,
|
||||||
}
|
}
|
||||||
return &pipe
|
return &pipe
|
||||||
|
@ -25,7 +25,7 @@ func (s *InMemoryTestSuite) createConnection() {
|
|||||||
ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second)
|
ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second)
|
||||||
s.ctx = ctx
|
s.ctx = ctx
|
||||||
s.cancelFunc = cancelFunc
|
s.cancelFunc = cancelFunc
|
||||||
s.pipe = NewInmemoryConnection(ctx, "inmemory")
|
s.pipe = NewInmemoryConnection(ctx, "inmemory", 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InMemoryTestSuite) SetupSuite() {
|
func (s *InMemoryTestSuite) SetupSuite() {
|
||||||
|
Loading…
Reference in New Issue
Block a user