work in progress for testing the matchmaker.

This commit is contained in:
Erik Brakkee 2024-08-23 21:10:01 +02:00
parent 670a705326
commit ba8bd15cf7
9 changed files with 256 additions and 56 deletions

View File

@ -186,10 +186,12 @@ func setupWebSockets(admin *matchmaker.MatchMaker, websessions *ui.WebSessions)
return return
} }
log.Printf("Got registration connection: '%s'\n", publicId) log.Printf("Got registration connection: '%s'\n", publicId)
err = admin.Register(publicId, conn) waitFunc, err := admin.Register(publicId, conn)
if err != nil { if err != nil {
log.Printf("Error %v\n", err) log.Printf("Error %v\n", err)
return
} }
waitFunc()
}, },
} }

1
go.mod
View File

@ -30,6 +30,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
golang.org/x/sys v0.22.0 // indirect golang.org/x/sys v0.22.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect

2
go.sum
View File

@ -48,6 +48,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

View File

@ -38,11 +38,6 @@ type AdminTestSuite struct {
hostKey []byte hostKey []byte
} }
func (s *AdminTestSuite) createPipe() (io.ReadWriteCloser, io.ReadWriteCloser) {
bitpipe := testsupport.NewInmemoryConnection(s.ctx, "inmemory", 10)
return bitpipe.Front(), bitpipe.Back()
}
func (s *AdminTestSuite) SetupSuite() { func (s *AdminTestSuite) SetupSuite() {
s.pprofServer = testsupport.StartPprof("") s.pprofServer = testsupport.StartPprof("")
} }
@ -77,7 +72,7 @@ type AddAgentResult struct {
} }
func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) { func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) {
agentToServerRW, serverToAgentRW := s.createPipe() agentToServerRW, serverToAgentRW := testsupport.CreatePipe(s.ctx)
res := testsupport.RunAndWait( res := testsupport.RunAndWait(
&s.Suite, &s.Suite,
func() any { func() any {
@ -102,7 +97,7 @@ func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId stri
type AgentRegisterResult struct { type AgentRegisterResult struct {
registration comms.AgentRegistration registration comms.AgentRegistration
commChannel comms.CommChannel commChannel comms.CommChannel
listener *TestAgentListener listener *testsupport.TestAgentListener
err error err error
} }
@ -224,7 +219,8 @@ func (s *AdminTestSuite) Test_MultipleAgentsAndClients() {
defer wg.Done() defer wg.Done()
iclient := i iclient := i
client := fmt.Sprintf("client %d/%d", iagent, iclient) client := fmt.Sprintf("client %d/%d", iagent, iclient)
s.connectClientToAgent(client, publicId, data, agentRes) _, err := s.connectClientToAgent(client, publicId, data, agentRes)
s.Nil(err)
}() }()
} }
}() }()
@ -235,7 +231,7 @@ func (s *AdminTestSuite) Test_MultipleAgentsAndClients() {
func (s *AdminTestSuite) connectClientToAgent( func (s *AdminTestSuite) connectClientToAgent(
client string, publicId string, data string, agentRes AgentRegisterResult) (*clientConnection, error) { client string, publicId string, data string, agentRes AgentRegisterResult) (*clientConnection, error) {
serverToClientRW, clientToServerRW := s.createPipe() serverToClientRW, clientToServerRW := testsupport.CreatePipe(s.ctx)
// TODO refactoring // TODO refactoring
// - TestAgentListener should run in a separate go routine // - TestAgentListener should run in a separate go routine
@ -295,36 +291,18 @@ func (s *AdminTestSuite) connectClientToAgent(
go clientConn.Synchronize() go clientConn.Synchronize()
msg := fmt.Sprintf("end-to-end %s", client) msg := fmt.Sprintf("end-to-end %s", client)
// verify bidirectional communication // verify bidirectional communication
s.bidirectionalConnectionCheck(msg, clientToServerRW, agentToServerYamux) testsupport.BidirectionalConnectionCheck(&s.Suite, msg, clientToServerRW, agentToServerYamux)
return clientConn, nil return clientConn, nil
} }
func (s *AdminTestSuite) bidirectionalConnectionCheck(msg string, clientToServerRW io.ReadWriteCloser, agentToServerYamux io.ReadWriter) {
data1 := msg + " client->agent"
data2 := msg + " agent->client"
log.Printf("BIDIRECTIONAL CHECK %v -> %v", msg, agentToServerYamux)
testsupport.RunAndWait(
&s.Suite,
func() any {
testsupport.AssertWriteData(&s.Suite, data1, clientToServerRW)
testsupport.AssertReadData(&s.Suite, data2, clientToServerRW)
return nil
},
func() any {
testsupport.AssertReadData(&s.Suite, data1, agentToServerYamux)
testsupport.AssertWriteData(&s.Suite, data2, agentToServerYamux)
return nil
})
}
func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() { func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() {
publicId := "abc" publicId := "abc"
serverRes, agentRes := s.agentRegisters(publicId, publicId) serverRes, agentRes := s.agentRegisters(publicId, publicId)
s.Nil(serverRes.err) s.Nil(serverRes.err)
s.Nil(agentRes.err) s.Nil(agentRes.err)
serverToClientRW, _ := s.createPipe() serverToClientRW, _ := testsupport.CreatePipe(s.ctx)
_, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"), _, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"),
iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr"))) iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
@ -363,7 +341,7 @@ func (s *AdminTestSuite) agentRegistration(agentToServerRW io.ReadWriteCloser) A
s.NotNil(commChannel) s.NotNil(commChannel)
baseListener := comms.NewAgentListener(commChannel.Session) baseListener := comms.NewAgentListener(commChannel.Session)
listener := NewTestListener(s.ctx, baseListener) listener := testsupport.NewTestListener(s.ctx, baseListener)
return AgentRegisterResult{ return AgentRegisterResult{
registration: agentRegistration, registration: agentRegistration,
@ -381,10 +359,10 @@ func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.Read
return clientConn return clientConn
} }
func (s *AdminTestSuite) clientConnection(clientId models.ClientId, listener *TestAgentListener) (net.Conn, error) { func (s *AdminTestSuite) clientConnection(clientId models.ClientId, listener *testsupport.TestAgentListener) (net.Conn, error) {
// agent // agent
log.Printf("clientConnection: Getting connection for %v", clientId) log.Printf("clientConnection: Getting connection for %v", clientId)
agentToServerYamux, err := listener.getConnection(clientId) agentToServerYamux, err := listener.GetConnection(string(clientId))
log.Printf("clientConnection: Got connection %v for client %v", agentToServerYamux, clientId) log.Printf("clientConnection: Got connection %v for client %v", agentToServerYamux, clientId)
s.Nil(err) s.Nil(err)
return agentToServerYamux, err return agentToServerYamux, err

View File

@ -33,24 +33,35 @@ func NewMatchMaker(notifier Notifier) *MatchMaker {
return &converge return &converge
} }
func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadWriteCloser) error { func (converge *MatchMaker) Close() {
converge.admin.Close()
}
type WaitForAgentFunc func()
func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadWriteCloser) (waitForAgentFunc WaitForAgentFunc, err error) {
serverInfo := comms.ServerInfo{} serverInfo := comms.ServerInfo{}
agentInfo, err := comms.ServerInitialization(conn, serverInfo) agentInfo, err := comms.ServerInitialization(conn, serverInfo)
if err != nil { if err != nil {
return err return nil, err
} }
agent, err := converge.admin.AddAgent(hostPrivateKey, publicId, agentInfo, conn) agent, err := converge.admin.AddAgent(hostPrivateKey, publicId, agentInfo, conn)
converge.logStatus() converge.logStatus()
if err != nil { if err != nil {
return err return nil, err
} }
publicId = agent.Info.PublicId publicId = agent.Info.PublicId
defer func() { cleanupFunc := func() {
converge.admin.RemoveAgent(publicId) converge.admin.RemoveAgent(publicId)
converge.logStatus() converge.logStatus()
}
defer func() {
if err != nil {
cleanupFunc()
}
}() }()
go func() { go func() {
@ -72,11 +83,14 @@ func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadW
}) })
}() }()
go log.Printf("agentConnection registered: '%s'\n", publicId) return WaitForAgentFunc(func() {
defer cleanupFunc()
log.Printf("agentConnection registered: '%s'\n", publicId)
for !agent.CommChannel.Session.IsClosed() { for !agent.CommChannel.Session.IsClosed() {
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
} }
return nil log.Printf("Agent disconnected")
}), nil
} }
func (converge *MatchMaker) Connect(wsProxyMode bool, publicId models.RendezVousId, conn iowrappers2.ReadWriteAddrCloser) error { func (converge *MatchMaker) Connect(wsProxyMode bool, publicId models.RendezVousId, conn iowrappers2.ReadWriteAddrCloser) error {

View File

@ -0,0 +1,163 @@
package matchmaker
import (
"context"
"git.wamblee.org/converge/pkg/comms"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/testsupport"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
"io"
"log"
"net/http"
"testing"
"time"
)
type MatchMakerTestSuite struct {
suite.Suite
ctx context.Context
cancelFunc context.CancelFunc
pprofServer *http.Server
notifier *TestNotifier
matchMaker *MatchMaker
}
type TestNotifier struct {
// last reported state
state *models.State
}
func (notifier *TestNotifier) Publish(state *models.State) {
notifier.state = state
}
func (s *MatchMakerTestSuite) SetupSuite() {
s.pprofServer = testsupport.StartPprof("")
}
func (s *MatchMakerTestSuite) TearDownSuite() {
testsupport.StopPprof(s.ctx, s.pprofServer)
}
func (s *MatchMakerTestSuite) SetupTest() {
ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
s.ctx = ctx
s.cancelFunc = cancelFunc
s.notifier = &TestNotifier{}
s.matchMaker = NewMatchMaker(s.notifier)
}
func (s *MatchMakerTestSuite) TearDownTest() {
s.matchMaker.Close()
s.cancelFunc()
goleak.VerifyNone(s.T())
}
func TestMatchMakerTestSuite(t *testing.T) {
suite.Run(t, &MatchMakerTestSuite{})
}
type TestAgent struct {
agentSideConn io.ReadWriteCloser
serverSIdeConn io.ReadWriteCloser
agentRegistration comms.AgentRegistration
commChannel comms.CommChannel
listener *testsupport.TestAgentListener
}
func NewTestAgent(ctx context.Context) *TestAgent {
res := TestAgent{}
a, s := testsupport.CreatePipe(ctx)
res.agentSideConn = a
res.serverSIdeConn = s
return &res
}
func (agent *TestAgent) Disconnect() {
agent.agentSideConn.Close()
}
func (agent *TestAgent) Initialize(s *MatchMakerTestSuite) (comms.ServerInfo, error) {
return comms.AgentInitialization(agent.agentSideConn, comms.NewEnvironmentInfo("bash"))
}
func (agent *TestAgent) Register(s *MatchMakerTestSuite) error {
agentRegistration, err := comms.ReceiveRegistrationMessage(agent.agentSideConn)
if err != nil {
return err
}
agent.agentRegistration = agentRegistration
commChannel, err := comms.NewCommChannel(comms.Agent, agent.agentSideConn)
if err != nil {
return err
}
s.NotNil(commChannel)
agent.commChannel = commChannel
baseListener := comms.NewAgentListener(commChannel.Session)
agent.listener = testsupport.NewTestListener(s.ctx, baseListener)
return nil
}
func (s *MatchMakerTestSuite) Test_newMatchMaker() {
s.checkState(0, 0)
}
func (s *MatchMakerTestSuite) Test_singleAgent() {
publicId := models.RendezVousId("abc")
agent := NewTestAgent(s.ctx)
waitForAgentFunc := s.registerAgent(publicId, agent)
s.checkState(1, 0)
// required for connection loss detection
go waitForAgentFunc()
agent.Disconnect()
s.checkState(0, 0)
}
func (s *MatchMakerTestSuite) checkState(nAgents int, nClients int) {
s.True(testsupport.CheckCondition(s.ctx, func() bool {
return nAgents == len(s.notifier.state.Agents)
}))
s.True(testsupport.CheckCondition(s.ctx, func() bool {
return nClients == len(s.notifier.state.Clients)
}))
}
func (s *MatchMakerTestSuite) registerAgent(publicId models.RendezVousId, agent *TestAgent) WaitForAgentFunc {
res := testsupport.RunAndWait(
&s.Suite,
func() any {
// ignore waitFunc for now.
waitFunc, err := s.matchMaker.Register(publicId, agent.serverSIdeConn)
s.Nil(err)
log.Printf("MatchMaskerTest: Agent registered by server")
return waitFunc
},
func() any {
_, err := agent.Initialize(s)
if err != nil {
s.Nil(err)
return nil
}
err = agent.Register(s)
if err != nil {
s.Nil(err)
return nil
}
log.Println("MatchMakerTest: Agent connected to server")
return nil
})
if res[0] == nil {
return nil
}
return res[0].(WaitForAgentFunc)
}

View File

@ -1,6 +1,9 @@
package testsupport package testsupport
import "context" import (
"context"
"io"
)
type InmemoryConnection struct { type InmemoryConnection struct {
ctx context.Context ctx context.Context
@ -31,3 +34,8 @@ func (bitpipe *InmemoryConnection) Back() *ChannelReadWriteCloser {
func pipe(ctx context.Context, receiveBuffer <-chan []byte, sendBuffer chan<- []byte, remoteAddr string) *ChannelReadWriteCloser { func pipe(ctx context.Context, receiveBuffer <-chan []byte, sendBuffer chan<- []byte, remoteAddr string) *ChannelReadWriteCloser {
return NewChannelReadWriteCloser(ctx, receiveBuffer, sendBuffer) return NewChannelReadWriteCloser(ctx, receiveBuffer, sendBuffer)
} }
func CreatePipe(ctx context.Context) (io.ReadWriteCloser, io.ReadWriteCloser) {
bitpipe := NewInmemoryConnection(ctx, "inmemory", 10)
return bitpipe.Front(), bitpipe.Back()
}

View File

@ -1,16 +1,16 @@
package admin package testsupport
import ( import (
"context" "context"
"errors" "errors"
"git.wamblee.org/converge/pkg/models"
"log" "log"
"net" "net"
"sync" "sync"
) )
// Extension of agentlistener for testing. It can accept all connections and puts them into a map based // Extension of agentlistener for testing. It can accept all connections and puts them into a map based
// on clientId after which a client can retrieve the accepted connection based on client id. // on clientId after which a client can retrieve the accepted connection based on local address (which is the
// client id.
type TestAgentListener struct { type TestAgentListener struct {
net.Listener net.Listener
@ -18,7 +18,7 @@ type TestAgentListener struct {
ctx context.Context ctx context.Context
mutex sync.Mutex mutex sync.Mutex
cond *sync.Cond cond *sync.Cond
connections map[models.ClientId]net.Conn connections map[string]net.Conn
} }
func NewTestListener(ctx context.Context, listener net.Listener) *TestAgentListener { func NewTestListener(ctx context.Context, listener net.Listener) *TestAgentListener {
@ -26,7 +26,7 @@ func NewTestListener(ctx context.Context, listener net.Listener) *TestAgentListe
ctx: ctx, ctx: ctx,
Listener: listener, Listener: listener,
mutex: sync.Mutex{}, mutex: sync.Mutex{},
connections: make(map[models.ClientId]net.Conn), connections: make(map[string]net.Conn),
} }
res.cond = sync.NewCond(&res.mutex) res.cond = sync.NewCond(&res.mutex)
@ -56,23 +56,23 @@ func (l *TestAgentListener) Accept() (net.Conn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
clientId := models.ClientId(conn.LocalAddr().String()) localAddr := conn.LocalAddr().String()
log.Printf("testlistener: Storing connection %v %v", clientId, conn) log.Printf("testlistener: Storing connection %v %v", localAddr, conn)
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
l.connections[clientId] = conn l.connections[localAddr] = conn
log.Printf("testlistener: broadcasting %v", clientId) log.Printf("testlistener: broadcasting %v", localAddr)
l.cond.Broadcast() l.cond.Broadcast()
return conn, err return conn, err
} }
func (l *TestAgentListener) getConnection(clientId models.ClientId) (net.Conn, error) { func (l *TestAgentListener) GetConnection(localAddr string) (net.Conn, error) {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
// We need to check the condition before the first cond.wait as well. Otherwise, a broadcast sent // We need to check the condition before the first cond.wait as well. Otherwise, a broadcast sent
// at this point in time will not be caught, and if there are no further broadcasts happening, then // at this point in time will not be caught, and if there are no further broadcasts happening, then
// the code will hang her. // the code will hang her.
for ok := l.connections[clientId] != nil; !ok; ok = l.connections[clientId] != nil { for ok := l.connections[localAddr] != nil; !ok; ok = l.connections[localAddr] != nil {
log.Println("Listener cond wait") log.Println("Listener cond wait")
l.cond.Wait() l.cond.Wait()
log.Println("Listener awoken") log.Println("Listener awoken")
@ -82,6 +82,6 @@ func (l *TestAgentListener) getConnection(clientId models.ClientId) (net.Conn, e
default: default:
} }
} }
log.Printf("Returning connection %v %v", clientId, l.connections[clientId]) log.Printf("Returning connection %v %v", localAddr, l.connections[localAddr])
return l.connections[clientId], nil return l.connections[localAddr], nil
} }

View File

@ -96,3 +96,35 @@ func PrintStackTraces() {
log.Println(string(buf)) log.Println(string(buf))
log.Println("") log.Println("")
} }
func BidirectionalConnectionCheck(s *suite.Suite, msg string, clientToServerRW io.ReadWriteCloser, agentToServerYamux io.ReadWriter) {
data1 := msg + " -> "
data2 := msg + " <- "
log.Printf("BIDIRECTIONAL CHECK %v", msg)
RunAndWait(
s,
func() any {
AssertWriteData(s, data1, clientToServerRW)
AssertReadData(s, data2, clientToServerRW)
return nil
},
func() any {
AssertReadData(s, data1, agentToServerYamux)
AssertWriteData(s, data2, agentToServerYamux)
return nil
})
}
// having the return type bool forces the check to be done in the test code
// leading to more clear error messages.
func CheckCondition(ctx context.Context, condition func() bool) bool {
for !condition() {
select {
case <-ctx.Done():
return false
default:
time.Sleep(1 * time.Millisecond)
}
}
return true
}