converge/pkg/testsupport/channelreadwriter.go
Erik Brakkee 2e8107ddbd discovered net.Pipe for testing tcp connnections which makes the
previously developed ChannelReadWriter and InmemoryConnection obsolete.
2024-09-08 11:16:49 +02:00

101 lines
1.9 KiB
Go

package testsupport
import (
"context"
"errors"
"io"
"log"
"runtime"
"sync"
)
type ChannelReadWriter struct {
ctx context.Context
receiverMutex sync.Mutex
receiver <-chan []byte
// bytes that were read and that did not fit
readBuf []byte
senderMutex sync.Mutex
sender chan<- []byte
closed bool
}
func NewChannelReadWriter(ctx context.Context, receiver <-chan []byte,
sender chan<- []byte) *ChannelReadWriter {
return &ChannelReadWriter{
ctx: ctx,
receiverMutex: sync.Mutex{},
receiver: receiver,
senderMutex: sync.Mutex{},
sender: sender,
closed: false,
}
}
func (rw *ChannelReadWriter) Read(p []byte) (n int, err error) {
rw.receiverMutex.Lock()
defer rw.receiverMutex.Unlock()
nread := copy(p, rw.readBuf)
if nread > 0 {
log.Printf("Read %v bytes", nread)
rw.readBuf = rw.readBuf[nread:]
return nread, nil
}
select {
case <-rw.ctx.Done():
return 0, io.ErrClosedPipe
case data, ok := <-rw.receiver:
if !ok {
return 0, errors.New("ladida") //io.EOF
}
nread = copy(p, data)
rw.readBuf = data[nread:]
return nread, nil
}
}
func (rw *ChannelReadWriter) Write(p []byte) (n int, err error) {
rw.senderMutex.Lock()
defer rw.senderMutex.Unlock()
if rw.closed {
return 0, errors.New("Write on closed channel")
}
// if context is canceled it should never write
select {
case <-rw.ctx.Done():
//rw.Close()
return 0, io.ErrClosedPipe
default:
}
select {
// deal with closing duirng the write
case <-rw.ctx.Done():
//rw.Close()
return 0, io.ErrClosedPipe
case rw.sender <- p:
}
return len(p), nil
}
func (rw *ChannelReadWriter) Close() error {
rw.senderMutex.Lock()
defer rw.senderMutex.Unlock()
if !rw.closed {
log.Println("Closing ChannelReadWriter")
buf := make([]byte, 0)
runtime.Stack(buf, false)
log.Printf("Stack %v", string(buf))
close(rw.sender)
rw.closed = true
}
return nil
}