package admin import ( "context" "crypto/rand" "fmt" "git.wamblee.org/converge/pkg/comms" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/support/iowrappers" "git.wamblee.org/converge/pkg/testsupport" "github.com/stretchr/testify/suite" "go.uber.org/goleak" "io" "net/http" "strings" "testing" "time" ) // test case // // Overall: // - Connect agent, connect 2 clients // - Connect multiple agents and clients type AdminTestSuite struct { suite.Suite ctx context.Context cancelFunc context.CancelFunc pprofServer *http.Server admin *Admin hostKey []byte } func (s *AdminTestSuite) createPipe() (io.ReadWriteCloser, io.ReadWriteCloser) { bitpipe := testsupport.NewInmemoryConnection(s.ctx, "inmemory", 10) return bitpipe.Front(), bitpipe.Back() } func (s *AdminTestSuite) SetupSuite() { s.pprofServer = testsupport.StartPprof("") } func (s *AdminTestSuite) TearDownSuite() { testsupport.StopPprof(s.ctx, s.pprofServer) } func (s *AdminTestSuite) SetupTest() { ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second) s.ctx = ctx s.cancelFunc = cancelFunc s.admin = NewAdmin() s.hostKey = make([]byte, 100) s.NotNil(rand.Read(s.hostKey)) } func (s *AdminTestSuite) TearDownTest() { s.admin.Close() s.cancelFunc() goleak.VerifyNone(s.T()) } func TestAdminTestSuite(t *testing.T) { suite.Run(t, &AdminTestSuite{}) } type AddAgentResult struct { agentConn *agentConnection err error } type AgentRegisterResult struct { registration comms.AgentRegistration commChannel comms.CommChannel err error } func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) { agentRW, serverRW := s.createPipe() res := testsupport.RunAndWait( &s.Suite, func() any { agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverRW) return AddAgentResult{ agentConn: agentConn, err: err, } }, func() any { res := s.agentRegistration(agentRW) if assignedPublicId != "" { s.Nil(res.err) s.True(res.registration.Ok) s.Equal(s.hostKey, res.registration.HostPrivateKey) } return res }) return res[0].(AddAgentResult), res[1].(AgentRegisterResult) } func (s *AdminTestSuite) Test_AgentRegisters() { publicId := "abc" res, _ := s.agentRegisters(publicId, publicId) s.Nil(res.err) agentConn := res.agentConn state := s.admin.CreateNotifification() s.Equal(1, len(state.Agents)) s.Equal(0, len(state.Clients)) s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid]) // Now unregister s.False(agentConn.CommChannel.Session.IsClosed()) s.admin.RemoveAgent(models.RendezVousId(publicId)) s.True(agentConn.CommChannel.Session.IsClosed()) // copy on write, orioginal state is unchanged s.Equal(1, len(state.Agents)) s.Equal(0, len(state.Clients)) state = s.admin.CreateNotifification() s.Equal(0, len(state.Agents)) s.Equal(0, len(state.Clients)) } func (s *AdminTestSuite) Test_ManyAgentsRegister() { N := 10 agentRegistrations := make([]testsupport.TestFunction, N) for i := range N { publicId := fmt.Sprintf("abc%d", i) agentRegistrations[i] = func() any { res, _ := s.agentRegisters(publicId, publicId) s.Nil(res.err) return res.agentConn } } res := testsupport.RunAndWait( &s.Suite, agentRegistrations...) state := s.admin.CreateNotifification() s.Equal(len(res), len(state.Agents)) s.Equal(0, len(state.Clients)) for _, entry := range res { agentConn := entry.(*agentConnection) s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid]) } } func (s *AdminTestSuite) Test_agentDuplicateId() { res, _ := s.agentRegisters("abc", "abc") s.Nil(res.err) for i := range 100 { res, _ = s.agentRegisters("abc", fmt.Sprintf("abc-%d", i)) s.Nil(res.err) } res, agentSideResult := s.agentRegisters("abc", "") s.NotNil(res.err) // verify it is the correct error and not an id mismatch. s.True(strings.Contains(res.err.Error(), "could not allocate a new unique id")) s.False(agentSideResult.registration.Ok) } func (s *AdminTestSuite) Test_connectClient() { publicId := "abc" serverRes, agentRes := s.agentRegisters(publicId, "abc") s.Nil(serverRes.err) s.Nil(agentRes.err) serverToClientRW, clientToServerRW := s.createPipe() data := "connect client test msg" res := testsupport.RunAndWait( &s.Suite, func() any { return s.connectClient(publicId, serverToClientRW, data) }, func() any { return s.clientConnection(agentRes, data) }) // bidirectional communciataion check clientConn := res[0].(*clientConnection) agentToServerYamux := res[1].(io.ReadWriter) go clientConn.Synchronize() s.bidirectionalConnectionCheck(clientToServerRW, agentToServerYamux) // verify state state := s.admin.CreateNotifification() s.Equal(1, len(state.Agents)) s.Equal(1, len(state.Clients)) s.Equal(clientConn.Info, state.Clients[clientConn.Info.Guid]) // will close the connections and as a result also th synchronize goroutine. s.cancelFunc() // Note: removing agents and clients after connection loss is the responsibility of the MatchMaker // Here we only test explicit removal of the agents and clients. } func (s *AdminTestSuite) Test_multipleAgentsAndClients() { } func (s *AdminTestSuite) bidirectionalConnectionCheck(clientToServerRW io.ReadWriteCloser, agentToServerYamux io.ReadWriter) { data1 := "mytestdata" data2 := "mytestdata-2" testsupport.RunAndWait( &s.Suite, func() any { testsupport.AssertWriteData(&s.Suite, data1, clientToServerRW) testsupport.AssertReadData(&s.Suite, data2, agentToServerYamux) return nil }, func() any { testsupport.AssertReadData(&s.Suite, data1, agentToServerYamux) testsupport.AssertWriteData(&s.Suite, data2, clientToServerRW) return nil }) } func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() { publicId := "abc" serverRes, agentRes := s.agentRegisters(publicId, "abc") s.Nil(serverRes.err) s.Nil(agentRes.err) serverToClientRW, _ := s.createPipe() _, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"), iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr"))) s.NotNil(err) // verify state state := s.admin.CreateNotifification() s.Equal(1, len(state.Agents)) s.Equal(0, len(state.Clients)) } // Registering an agent on the server func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverRW io.ReadWriteCloser) (*agentConnection, error) { agentConn, err := s.admin.AddAgent( s.hostKey, models.RendezVousId(publicId), comms.EnvironmentInfo{}, serverRW) if err != nil { return nil, err } s.Equal(assignedPublicId, string(agentConn.Info.PublicId)) return agentConn, nil } // Agent activities registring on the server func (s *AdminTestSuite) agentRegistration(agentRW io.ReadWriteCloser) AgentRegisterResult { // verify registration message received agentRegistration, err := comms.ReceiveRegistrationMessage(agentRW) if err != nil { return AgentRegisterResult{err: err} } commChannel, err := comms.NewCommChannel(comms.Agent, agentRW) if err != nil { return AgentRegisterResult{registration: agentRegistration, err: err} } s.NotNil(commChannel) return AgentRegisterResult{registration: agentRegistration, commChannel: commChannel, err: nil} } func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.ReadWriteCloser, data string) any { // server clientConn, err := s.admin.AddClient(models.RendezVousId(publicId), iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr"))) s.Nil(err) // Connection to agent over yamux serverToAgentYamux := clientConn.agentConnection // test by sending a message to the agent. testsupport.AssertWriteData(&s.Suite, data, serverToAgentYamux) return clientConn } func (s *AdminTestSuite) clientConnection(agentRes AgentRegisterResult, data string) any { // agent listener := comms.NewAgentListener(agentRes.commChannel.Session) //.Connection from server over yamux agentToServerYamux, err := listener.Accept() s.Nil(err) // Test by receiving a message from the server testsupport.AssertReadData(&s.Suite, data, agentToServerYamux) return agentToServerYamux }