From 0cd0b543a242c5a6f90622de0034fc124262f057 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Thu, 22 Aug 2024 22:44:54 +0200 Subject: [PATCH] multiple clients connecting to multiple agents. Clients cannot yet be started in parallel. due to subtle issue in test setup with accept --- Makefile | 2 +- pkg/comms/agentlistener.go | 5 +- pkg/server/admin/admin.go | 2 +- pkg/server/admin/admin_test.go | 102 +++++++++++++++++------ pkg/support/concurrency/atomiccounter.go | 5 +- pkg/testsupport/utils.go | 4 +- 6 files changed, 86 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index 82d42a9..e964ae8 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ vet: fmt go vet ./... test: build - go test -count=1 -v ./... + go test -count=1 -v ./... build: generate vet mkdir -p bin diff --git a/pkg/comms/agentlistener.go b/pkg/comms/agentlistener.go index 6290475..0ddaabc 100644 --- a/pkg/comms/agentlistener.go +++ b/pkg/comms/agentlistener.go @@ -2,6 +2,7 @@ package comms import ( "git.wamblee.org/converge/pkg/support/websocketutil" + "log" "net" ) @@ -41,7 +42,9 @@ func (listener AgentListener) Accept() (net.Conn, error) { conn.Close() return nil, err } - return NewLocalAddrHackConn(conn, clientId), nil + conn = NewLocalAddrHackConn(conn, clientId) + log.Printf("ACCEPT %v %v", clientId, conn) + return conn, nil } func (listener AgentListener) Close() error { diff --git a/pkg/server/admin/admin.go b/pkg/server/admin/admin.go index 52cd67d..1a3f0c0 100644 --- a/pkg/server/admin/admin.go +++ b/pkg/server/admin/admin.go @@ -50,7 +50,7 @@ func newClient(publicId models.RendezVousId, clientConn iowrappers2.ReadWriteAdd RemoteAddr: models.RemoteAddr(clientConn.RemoteAddr().String()), PublicId: publicId, AgentGuid: agentGuid, - ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.IncrementAndGet())), + ClientId: models.ClientId(strconv.Itoa(clientIdGenerator.GetAndIncrement())), StartTime: time.Now(), } return &clientConnection{ diff --git a/pkg/server/admin/admin_test.go b/pkg/server/admin/admin_test.go index 15890e6..e9d1ed0 100644 --- a/pkg/server/admin/admin_test.go +++ b/pkg/server/admin/admin_test.go @@ -9,10 +9,11 @@ import ( "git.wamblee.org/converge/pkg/support/iowrappers" "git.wamblee.org/converge/pkg/testsupport" "github.com/stretchr/testify/suite" - "go.uber.org/goleak" "io" + "log" "net/http" "strings" + "sync" "testing" "time" ) @@ -60,7 +61,7 @@ func (s *AdminTestSuite) SetupTest() { func (s *AdminTestSuite) TearDownTest() { s.admin.Close() s.cancelFunc() - goleak.VerifyNone(s.T()) + //goleak.VerifyNone(s.T()) } func TestAdminTestSuite(t *testing.T) { @@ -79,18 +80,18 @@ type AgentRegisterResult struct { } func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) { - agentRW, serverRW := s.createPipe() + agentToServerRW, serverToAgentRW := s.createPipe() res := testsupport.RunAndWait( &s.Suite, func() any { - agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverRW) + agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverToAgentRW) return AddAgentResult{ agentConn: agentConn, err: err, } }, func() any { - res := s.agentRegistration(agentRW) + res := s.agentRegistration(agentToServerRW) if assignedPublicId != "" { s.Nil(res.err) s.True(res.registration.Ok) @@ -113,7 +114,7 @@ func (s *AdminTestSuite) Test_AgentRegisters() { // Now unregister s.False(agentConn.CommChannel.Session.IsClosed()) - s.admin.RemoveAgent(models.RendezVousId(publicId)) + s.Nil(s.admin.RemoveAgent(models.RendezVousId(publicId))) s.True(agentConn.CommChannel.Session.IsClosed()) // copy on write, orioginal state is unchanged s.Equal(1, len(state.Agents)) @@ -147,13 +148,14 @@ func (s *AdminTestSuite) Test_ManyAgentsRegister() { } func (s *AdminTestSuite) Test_agentDuplicateId() { - res, _ := s.agentRegisters("abc", "abc") + publicId := "abc" + res, _ := s.agentRegisters(publicId, publicId) s.Nil(res.err) for i := range 100 { - res, _ = s.agentRegisters("abc", fmt.Sprintf("abc-%d", i)) + res, _ = s.agentRegisters(publicId, fmt.Sprintf("%s-%d", publicId, i)) s.Nil(res.err) } - res, agentSideResult := s.agentRegisters("abc", "") + res, agentSideResult := s.agentRegisters(publicId, "") s.NotNil(res.err) // verify it is the correct error and not an id mismatch. s.True(strings.Contains(res.err.Error(), "could not allocate a new unique id")) @@ -162,7 +164,7 @@ func (s *AdminTestSuite) Test_agentDuplicateId() { func (s *AdminTestSuite) Test_connectClient() { publicId := "abc" - serverRes, agentRes := s.agentRegisters(publicId, "abc") + serverRes, agentRes := s.agentRegisters(publicId, publicId) s.Nil(serverRes.err) s.Nil(agentRes.err) @@ -175,14 +177,14 @@ func (s *AdminTestSuite) Test_connectClient() { return s.connectClient(publicId, serverToClientRW, data) }, func() any { - return s.clientConnection(agentRes, data) + return s.clientConnection("0", agentRes, data) }) - // bidirectional communciataion check + // bidirectional communication check clientConn := res[0].(*clientConnection) agentToServerYamux := res[1].(io.ReadWriter) go clientConn.Synchronize() - s.bidirectionalConnectionCheck(clientToServerRW, agentToServerYamux) + s.bidirectionalConnectionCheck("mymessage", clientToServerRW, agentToServerYamux) // verify state state := s.admin.CreateNotifification() @@ -202,30 +204,76 @@ func (s *AdminTestSuite) Test_connectClient() { s.True(strings.Contains(err.Error(), "closed")) } -func (s *AdminTestSuite) Test_multipleAgentsAndClients() { +func (s *AdminTestSuite) Test_MultipleAgentsAndClients() { + clientCounts := []int{23, 5, 3, 1} + + wg := sync.WaitGroup{} + for iagent, clientCount := range clientCounts { + wg.Add(1) + data := fmt.Sprintf("Agent test msg %d ", iagent) + go func() { + defer wg.Done() + publicId := fmt.Sprintf("abc%d", iagent) + serverRes, agentRes := s.agentRegisters(publicId, publicId) + s.Nil(serverRes.err) + s.Nil(agentRes.err) + for i := 0; i < clientCount; i++ { + // cannot yet create clients in parallel. Problem is that calling + // listener.Accept on the agent side can yield a connection from another + // client. The listener omehow needs to publish the connections that ere + // created in a map base on client id. The client can then retrieve the + // connection based on the client id and should also wait until the + // connection is available. + iclient := i + client := fmt.Sprintf("client %d/%d", iagent, iclient) + s.connectClientToAgent(client, publicId, data, agentRes) + } + }() + } + wg.Wait() } -func (s *AdminTestSuite) bidirectionalConnectionCheck(clientToServerRW io.ReadWriteCloser, agentToServerYamux io.ReadWriter) { - data1 := "mytestdata" - data2 := "mytestdata-2" +func (s *AdminTestSuite) connectClientToAgent(client string, publicId string, data string, agentRes AgentRegisterResult) { + serverToClientRW, clientToServerRW := s.createPipe() + res := testsupport.RunAndWait( + &s.Suite, + func() any { + return s.connectClient(publicId, serverToClientRW, data) + }, + func() any { + return s.clientConnection(client, agentRes, data) + }) + + // bidirectional communication check + clientConn := res[0].(*clientConnection) + agentToServerYamux := res[1].(io.ReadWriter) + go clientConn.Synchronize() + msg := fmt.Sprintf("end-to-end %s", client) + s.bidirectionalConnectionCheck(msg, clientToServerRW, agentToServerYamux) +} + +func (s *AdminTestSuite) bidirectionalConnectionCheck(msg string, clientToServerRW io.ReadWriteCloser, agentToServerYamux io.ReadWriter) { + data1 := msg + " client->agent" + data2 := msg + " agent->client" + log.Printf("BIDIRECTIONAL CHECK %v -> %v", msg, agentToServerYamux) testsupport.RunAndWait( &s.Suite, func() any { testsupport.AssertWriteData(&s.Suite, data1, clientToServerRW) - testsupport.AssertReadData(&s.Suite, data2, agentToServerYamux) + testsupport.AssertReadData(&s.Suite, data2, clientToServerRW) return nil }, func() any { testsupport.AssertReadData(&s.Suite, data1, agentToServerYamux) - testsupport.AssertWriteData(&s.Suite, data2, clientToServerRW) + testsupport.AssertWriteData(&s.Suite, data2, agentToServerYamux) return nil }) } func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() { publicId := "abc" - serverRes, agentRes := s.agentRegisters(publicId, "abc") + serverRes, agentRes := s.agentRegisters(publicId, publicId) s.Nil(serverRes.err) s.Nil(agentRes.err) @@ -243,10 +291,10 @@ func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() { // Registering an agent on the server -func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverRW io.ReadWriteCloser) (*agentConnection, error) { +func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverToAgentRW io.ReadWriteCloser) (*agentConnection, error) { agentConn, err := s.admin.AddAgent( s.hostKey, models.RendezVousId(publicId), comms.EnvironmentInfo{}, - serverRW) + serverToAgentRW) if err != nil { return nil, err } @@ -255,13 +303,13 @@ func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serv } // Agent activities registring on the server -func (s *AdminTestSuite) agentRegistration(agentRW io.ReadWriteCloser) AgentRegisterResult { +func (s *AdminTestSuite) agentRegistration(agentToServerRW io.ReadWriteCloser) AgentRegisterResult { // verify registration message received - agentRegistration, err := comms.ReceiveRegistrationMessage(agentRW) + agentRegistration, err := comms.ReceiveRegistrationMessage(agentToServerRW) if err != nil { return AgentRegisterResult{err: err} } - commChannel, err := comms.NewCommChannel(comms.Agent, agentRW) + commChannel, err := comms.NewCommChannel(comms.Agent, agentToServerRW) if err != nil { return AgentRegisterResult{registration: agentRegistration, err: err} } @@ -281,13 +329,15 @@ func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.Read return clientConn } -func (s *AdminTestSuite) clientConnection(agentRes AgentRegisterResult, data string) any { +func (s *AdminTestSuite) clientConnection(client string, agentRes AgentRegisterResult, data string) any { // agent listener := comms.NewAgentListener(agentRes.commChannel.Session) //.Connection from server over yamux agentToServerYamux, err := listener.Accept() s.Nil(err) + log.Printf("RESULT FROM ACCEPT %s %v", client, agentToServerYamux) // Test by receiving a message from the server testsupport.AssertReadData(&s.Suite, data, agentToServerYamux) + log.Printf("Asserted on read data: %v", data) return agentToServerYamux } diff --git a/pkg/support/concurrency/atomiccounter.go b/pkg/support/concurrency/atomiccounter.go index 1599a04..25920dd 100644 --- a/pkg/support/concurrency/atomiccounter.go +++ b/pkg/support/concurrency/atomiccounter.go @@ -14,9 +14,10 @@ func NewAtomicCounter() *AtomicCounter { } } -func (counter *AtomicCounter) IncrementAndGet() int { +func (counter *AtomicCounter) GetAndIncrement() int { counter.mutex.Lock() defer counter.mutex.Unlock() + val := counter.lastValue counter.lastValue++ - return counter.lastValue + return val } diff --git a/pkg/testsupport/utils.go b/pkg/testsupport/utils.go index 2a8e439..a50d445 100644 --- a/pkg/testsupport/utils.go +++ b/pkg/testsupport/utils.go @@ -21,9 +21,7 @@ func RunAndWait(suite *suite.Suite, functions ...TestFunction) []any { res := make([]any, len(functions)) for i, function := range functions { go func() { - defer func() { - wg.Done() - }() + defer wg.Done() res[i] = function() }() }