package admin import ( "context" "crypto/rand" "errors" "fmt" "git.wamblee.org/converge/pkg/comms" "git.wamblee.org/converge/pkg/models" "git.wamblee.org/converge/pkg/support/iowrappers" "git.wamblee.org/converge/pkg/testsupport" "github.com/stretchr/testify/suite" "go.uber.org/goleak" "io" "log" "net" "net/http" "strings" "sync" "testing" "time" ) // test case // // Overall: // - Connect agent, connect 2 clients // - Connect multiple agents and clients type AdminTestSuite struct { suite.Suite ctx context.Context cancelFunc context.CancelFunc pprofServer *http.Server admin *Admin hostKey []byte } func (s *AdminTestSuite) SetupSuite() { s.pprofServer = testsupport.StartPprof("") } func (s *AdminTestSuite) TearDownSuite() { testsupport.StopPprof(s.ctx, s.pprofServer) } func (s *AdminTestSuite) SetupTest() { ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second) s.ctx = ctx s.cancelFunc = cancelFunc s.admin = NewAdmin() s.hostKey = make([]byte, 100) s.NotNil(rand.Read(s.hostKey)) } func (s *AdminTestSuite) TearDownTest() { s.admin.Close() s.cancelFunc() goleak.VerifyNone(s.T()) } func TestAdminTestSuite(t *testing.T) { suite.Run(t, &AdminTestSuite{}) } type AddAgentResult struct { agentConn *agentConnection err error } func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) { agentToServerRW, serverToAgentRW := testsupport.CreatePipe(s.ctx) res := testsupport.RunAndWait( &s.Suite, func() any { agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverToAgentRW) return AddAgentResult{ agentConn: agentConn, err: err, } }, func() any { res := s.agentRegistration(agentToServerRW) if assignedPublicId != "" { s.Nil(res.err) s.True(res.registration.Ok) s.Equal(s.hostKey, res.registration.HostPrivateKey) } return res }) return res[0].(AddAgentResult), res[1].(AgentRegisterResult) } type AgentRegisterResult struct { registration comms.AgentRegistration commChannel comms.CommChannel listener *testsupport.TestAgentListener err error } func (s *AdminTestSuite) Test_AgentRegisters() { publicId := "abc" res, _ := s.agentRegisters(publicId, publicId) s.Nil(res.err) agentConn := res.agentConn state := s.admin.CreateNotifification() s.Equal(1, len(state.Agents)) s.Equal(0, len(state.Clients)) s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid]) // Now unregister s.False(agentConn.CommChannel.Session.IsClosed()) s.Nil(s.admin.RemoveAgent(models.RendezVousId(publicId))) s.True(agentConn.CommChannel.Session.IsClosed()) // copy on write, orioginal state is unchanged s.Equal(1, len(state.Agents)) s.Equal(0, len(state.Clients)) state = s.admin.CreateNotifification() s.Equal(0, len(state.Agents)) s.Equal(0, len(state.Clients)) } func (s *AdminTestSuite) Test_ManyAgentsRegister() { N := 10 agentRegistrations := make([]testsupport.TestFunction, N) for i := range N { publicId := fmt.Sprintf("abc%d", i) agentRegistrations[i] = func() any { res, _ := s.agentRegisters(publicId, publicId) s.Nil(res.err) return res.agentConn } } res := testsupport.RunAndWait( &s.Suite, agentRegistrations...) state := s.admin.CreateNotifification() s.Equal(len(res), len(state.Agents)) s.Equal(0, len(state.Clients)) for _, entry := range res { agentConn := entry.(*agentConnection) s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid]) } } func (s *AdminTestSuite) Test_agentDuplicateId() { publicId := "abc" res, _ := s.agentRegisters(publicId, publicId) s.Nil(res.err) for i := range 100 { res, _ = s.agentRegisters(publicId, fmt.Sprintf("%s-%d", publicId, i)) s.Nil(res.err) } res, agentSideResult := s.agentRegisters(publicId, "") s.NotNil(res.err) // verify it is the correct error and not an id mismatch. s.True(strings.Contains(res.err.Error(), "could not allocate a new unique id")) s.False(agentSideResult.registration.Ok) } func (s *AdminTestSuite) Test_connectClient() error { publicId := "abc" serverRes, agentRes := s.agentRegisters(publicId, publicId) s.Nil(serverRes.err) s.Nil(agentRes.err) data := "connect client test msg" clientConn, err := s.connectClientToAgent("singleclient", publicId, data, agentRes) s.Nil(err) if err != nil { return err } // verify state state := s.admin.CreateNotifification() s.Equal(1, len(state.Agents)) s.Equal(1, len(state.Clients)) s.Equal(clientConn.Info, state.Clients[clientConn.Info.Guid]) // removing the client will close all connections, we test this by writing to the connections // after removing the client. err = s.admin.RemoveClient(clientConn) s.Nil(err) buf := make([]byte, 10) _, err = clientConn.clientConnection.Write(buf) s.NotNil(err) s.True(strings.Contains(err.Error(), "closed")) _, err = clientConn.agentConnection.Write(buf) s.NotNil(err) s.True(strings.Contains(err.Error(), "closed")) return nil } func (s *AdminTestSuite) Test_MultipleAgentsAndClients() { clientCounts := []int{10, 5, 37, 1, 29} wg := sync.WaitGroup{} for iagent, clientCount := range clientCounts { wg.Add(1) data := fmt.Sprintf("Agent test msg %d ", iagent) go func() { defer wg.Done() publicId := fmt.Sprintf("abc%d", iagent) serverRes, agentRes := s.agentRegisters(publicId, publicId) s.Nil(serverRes.err) s.Nil(agentRes.err) for i := 0; i < clientCount; i++ { // cannot yet create clients in parallel. Problem is that calling // listener.Accept on the agent side can yield a connection from another // client. The listener omehow needs to publish the connections that ere // created in a map base on client id. The client can then retrieve the // connection based on the client id and should also wait until the // connection is available. wg.Add(1) go func() { defer wg.Done() iclient := i client := fmt.Sprintf("client %d/%d", iagent, iclient) _, err := s.connectClientToAgent(client, publicId, data, agentRes) s.Nil(err) }() } }() } wg.Wait() } func (s *AdminTestSuite) connectClientToAgent( client string, publicId string, data string, agentRes AgentRegisterResult) (*clientConnection, error) { serverToClientRW, clientToServerRW := testsupport.CreatePipe(s.ctx) // TODO refactoring // - TestAgentListener should run in a separate go routine // Started by TestAgentSuite. // // TODO split up: // 1. server: connects to agent, agent: listens for connections // output: server: clientConnection with a.o. clientId // agent: listener // 2. communication check: // server: use yamux connection to send message // agent: retrieve connection from listener based on client id from clientConnection // -> yamux connection // exchange messages in two directions. // 3. birectional communication // full communication from client to agent through the converge server. // Connect server to agent res := testsupport.RunAndWait( &s.Suite, // Server: agent is already listening and accepts all connections and stores them based on clientId func() any { return s.connectClient(publicId, serverToClientRW) }) // bidirectional communication check clientConn := res[0].(*clientConnection) s.NotNil(clientConn) if clientConn == nil { return nil, errors.New("Client connection is nil") } clientId := clientConn.Info.ClientId // Retrieve the agent side connection for this client that was setup by the server agentToServerYamux, err := s.clientConnection(clientId, agentRes.listener) s.Nil(err) if err != nil { return nil, err } log.Println("Got agentToServerYamux") serverToAgentYamux := clientConn.agentConnection // Now first test the communication from server to agent over the just established connection testsupport.RunAndWait( &s.Suite, func() any { s.sendYamuxMsgServerToAgent(serverToAgentYamux, data) return nil }, func() any { s.receiveYamuxMsgServerToAgent(agentToServerYamux, data) return nil }) // Synchronize data between client and agent through the server go clientConn.Synchronize() msg := fmt.Sprintf("end-to-end %s", client) // verify bidirectional communication testsupport.BidirectionalConnectionCheck(&s.Suite, msg, clientToServerRW, agentToServerYamux) return clientConn, nil } func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() { publicId := "abc" serverRes, agentRes := s.agentRegisters(publicId, publicId) s.Nil(serverRes.err) s.Nil(agentRes.err) serverToClientRW, _ := testsupport.CreatePipe(s.ctx) _, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"), iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr"))) s.NotNil(err) // verify state state := s.admin.CreateNotifification() s.Equal(1, len(state.Agents)) s.Equal(0, len(state.Clients)) } // Registering an agent on the server func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverToAgentRW io.ReadWriteCloser) (*agentConnection, error) { agentConn, err := s.admin.AddAgent( s.hostKey, models.RendezVousId(publicId), comms.EnvironmentInfo{}, serverToAgentRW) if err != nil { return nil, err } s.Equal(assignedPublicId, string(agentConn.Info.PublicId)) return agentConn, nil } // Agent activities registring on the server func (s *AdminTestSuite) agentRegistration(agentToServerRW io.ReadWriteCloser) AgentRegisterResult { // verify registration message received agentRegistration, err := comms.ReceiveRegistrationMessage(agentToServerRW) if err != nil { return AgentRegisterResult{err: err} } commChannel, err := comms.NewCommChannel(comms.Agent, agentToServerRW) if err != nil { return AgentRegisterResult{registration: agentRegistration, err: err} } s.NotNil(commChannel) baseListener := comms.NewAgentListener(commChannel.Session) listener := testsupport.NewTestListener(s.ctx, baseListener) return AgentRegisterResult{ registration: agentRegistration, commChannel: commChannel, listener: listener, err: nil, } } func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.ReadWriteCloser) any { // server clientConn, err := s.admin.AddClient(models.RendezVousId(publicId), iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr"))) s.Nil(err) return clientConn } func (s *AdminTestSuite) clientConnection(clientId models.ClientId, listener *testsupport.TestAgentListener) (net.Conn, error) { // agent log.Printf("clientConnection: Getting connection for %v", clientId) agentToServerYamux, err := listener.GetConnection(string(clientId)) log.Printf("clientConnection: Got connection %v for client %v", agentToServerYamux, clientId) s.Nil(err) return agentToServerYamux, err } func (s *AdminTestSuite) sendYamuxMsgServerToAgent(serverToAgentYamux io.Writer, data string) { // server testsupport.AssertWriteData(&s.Suite, data, serverToAgentYamux) } func (s *AdminTestSuite) receiveYamuxMsgServerToAgent(agentToServerYamux io.Reader, data string) { // agent testsupport.AssertReadData(&s.Suite, data, agentToServerYamux) }