renamed ChannelReadWriter to ChannelReadWriteCloser.
This commit is contained in:
parent
f82f656b50
commit
6eef930232
@ -9,7 +9,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ChannelReadWriter struct {
|
type ChannelReadWriteCloser struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
receiverMutex sync.Mutex
|
receiverMutex sync.Mutex
|
||||||
@ -22,9 +22,9 @@ type ChannelReadWriter struct {
|
|||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChannelReadWriter(ctx context.Context, receiver <-chan []byte,
|
func NewChannelReadWriteCloser(ctx context.Context, receiver <-chan []byte,
|
||||||
sender chan<- []byte) *ChannelReadWriter {
|
sender chan<- []byte) *ChannelReadWriteCloser {
|
||||||
return &ChannelReadWriter{
|
return &ChannelReadWriteCloser{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
receiverMutex: sync.Mutex{},
|
receiverMutex: sync.Mutex{},
|
||||||
receiver: receiver,
|
receiver: receiver,
|
||||||
@ -34,7 +34,7 @@ func NewChannelReadWriter(ctx context.Context, receiver <-chan []byte,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
|
func (rw *ChannelReadWriteCloser) Read(p []byte) (n int, err error) {
|
||||||
rw.receiverMutex.Lock()
|
rw.receiverMutex.Lock()
|
||||||
defer rw.receiverMutex.Unlock()
|
defer rw.receiverMutex.Unlock()
|
||||||
|
|
||||||
@ -57,7 +57,7 @@ func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rw *ChannelReadWriter) Write(pIn []byte) (n int, err error) {
|
func (rw *ChannelReadWriteCloser) Write(pIn []byte) (n int, err error) {
|
||||||
p := make([]byte, len(pIn), len(pIn))
|
p := make([]byte, len(pIn), len(pIn))
|
||||||
copy(p, pIn)
|
copy(p, pIn)
|
||||||
rw.senderMutex.Lock()
|
rw.senderMutex.Lock()
|
||||||
@ -85,12 +85,12 @@ func (rw *ChannelReadWriter) Write(pIn []byte) (n int, err error) {
|
|||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rw *ChannelReadWriter) Close() error {
|
func (rw *ChannelReadWriteCloser) Close() error {
|
||||||
rw.senderMutex.Lock()
|
rw.senderMutex.Lock()
|
||||||
defer rw.senderMutex.Unlock()
|
defer rw.senderMutex.Unlock()
|
||||||
|
|
||||||
if !rw.closed {
|
if !rw.closed {
|
||||||
log.Println("Closing ChannelReadWriter")
|
log.Println("Closing ChannelReadWriteCloser")
|
||||||
buf := make([]byte, 0)
|
buf := make([]byte, 0)
|
||||||
runtime.Stack(buf, false)
|
runtime.Stack(buf, false)
|
||||||
log.Printf("Stack %v", string(buf))
|
log.Printf("Stack %v", string(buf))
|
||||||
|
@ -19,7 +19,7 @@ type ChannelReadWriterTestSuite struct {
|
|||||||
cancelFunc context.CancelFunc
|
cancelFunc context.CancelFunc
|
||||||
toChannel chan<- []byte
|
toChannel chan<- []byte
|
||||||
fromChannel <-chan []byte
|
fromChannel <-chan []byte
|
||||||
conn *ChannelReadWriter
|
conn *ChannelReadWriteCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestChannelReadWriterSuite(t *testing.T) {
|
func TestChannelReadWriterSuite(t *testing.T) {
|
||||||
@ -36,7 +36,7 @@ func (s *ChannelReadWriterTestSuite) createChannel() {
|
|||||||
ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second)
|
ctx, cancelFunc := CreateTestContext(context.Background(), 10*time.Second)
|
||||||
s.ctx = ctx
|
s.ctx = ctx
|
||||||
s.cancelFunc = cancelFunc
|
s.cancelFunc = cancelFunc
|
||||||
s.conn = NewChannelReadWriter(ctx, toChannel, fromChannel)
|
s.conn = NewChannelReadWriteCloser(ctx, toChannel, fromChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ChannelReadWriterTestSuite) SetupSuite() {
|
func (s *ChannelReadWriterTestSuite) SetupSuite() {
|
||||||
@ -83,7 +83,7 @@ func (s *ChannelReadWriterTestSuite) Test_SuccessfulChannelToReadWriter() {
|
|||||||
},
|
},
|
||||||
{ // NOTE: no intelligence in the reader to fill up the read buffer when it is not full
|
{ // NOTE: no intelligence in the reader to fill up the read buffer when it is not full
|
||||||
// therefore, the second read will have only 1 char since the first channel read returned
|
// therefore, the second read will have only 1 char since the first channel read returned
|
||||||
// 3 of which 2 where returned in the first read call to the ChannelReadWriter.
|
// 3 of which 2 where returned in the first read call to the ChannelReadWriteCloser.
|
||||||
name: "buffer_too_small_multiple_writes",
|
name: "buffer_too_small_multiple_writes",
|
||||||
data: []string{"hel", "lo"},
|
data: []string{"hel", "lo"},
|
||||||
chunkSizes: []int{2, 2, 2},
|
chunkSizes: []int{2, 2, 2},
|
||||||
|
@ -20,14 +20,14 @@ func NewInmemoryConnection(ctx context.Context, addr string, channelSize int) *I
|
|||||||
return &pipe
|
return &pipe
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bitpipe *InmemoryConnection) Front() *ChannelReadWriter {
|
func (bitpipe *InmemoryConnection) Front() *ChannelReadWriteCloser {
|
||||||
return pipe(bitpipe.ctx, bitpipe.backToFront, bitpipe.frontToBack, bitpipe.addr)
|
return pipe(bitpipe.ctx, bitpipe.backToFront, bitpipe.frontToBack, bitpipe.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bitpipe *InmemoryConnection) Back() *ChannelReadWriter {
|
func (bitpipe *InmemoryConnection) Back() *ChannelReadWriteCloser {
|
||||||
return pipe(bitpipe.ctx, bitpipe.frontToBack, bitpipe.backToFront, bitpipe.addr)
|
return pipe(bitpipe.ctx, bitpipe.frontToBack, bitpipe.backToFront, bitpipe.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func pipe(ctx context.Context, receiveBuffer <-chan []byte, sendBuffer chan<- []byte, remoteAddr string) *ChannelReadWriter {
|
func pipe(ctx context.Context, receiveBuffer <-chan []byte, sendBuffer chan<- []byte, remoteAddr string) *ChannelReadWriteCloser {
|
||||||
return NewChannelReadWriter(ctx, receiveBuffer, sendBuffer)
|
return NewChannelReadWriteCloser(ctx, receiveBuffer, sendBuffer)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user