converge/pkg/server/admin/admin_test.go

380 lines
11 KiB
Go

package admin
import (
"context"
"crypto/rand"
"errors"
"fmt"
"git.wamblee.org/converge/pkg/comms"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/support/iowrappers"
"git.wamblee.org/converge/pkg/testsupport"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
"io"
"log"
"net"
"net/http"
"strings"
"sync"
"testing"
"time"
)
// test case
//
// Overall:
// - Connect agent, connect 2 clients
// - Connect multiple agents and clients
type AdminTestSuite struct {
suite.Suite
ctx context.Context
cancelFunc context.CancelFunc
pprofServer *http.Server
admin *Admin
hostKey []byte
}
func (s *AdminTestSuite) SetupSuite() {
s.pprofServer = testsupport.StartPprof("")
}
func (s *AdminTestSuite) TearDownSuite() {
testsupport.StopPprof(s.ctx, s.pprofServer)
}
func (s *AdminTestSuite) SetupTest() {
ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
s.ctx = ctx
s.cancelFunc = cancelFunc
s.admin = NewAdmin()
s.hostKey = make([]byte, 100)
s.NotNil(rand.Read(s.hostKey))
}
func (s *AdminTestSuite) TearDownTest() {
s.admin.Close()
s.cancelFunc()
goleak.VerifyNone(s.T())
}
func TestAdminTestSuite(t *testing.T) {
suite.Run(t, &AdminTestSuite{})
}
type AddAgentResult struct {
agentConn *agentConnection
err error
}
func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) {
agentToServerRW, serverToAgentRW := testsupport.CreatePipe(s.ctx)
res := testsupport.RunAndWait(
&s.Suite,
func() any {
agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverToAgentRW)
return AddAgentResult{
agentConn: agentConn,
err: err,
}
},
func() any {
res := s.agentRegistration(agentToServerRW)
if assignedPublicId != "" {
s.Nil(res.err)
s.True(res.registration.Ok)
s.Equal(s.hostKey, res.registration.HostPrivateKey)
}
return res
})
return res[0].(AddAgentResult), res[1].(AgentRegisterResult)
}
type AgentRegisterResult struct {
registration comms.AgentRegistration
commChannel comms.CommChannel
listener *testsupport.TestAgentListener
err error
}
func (s *AdminTestSuite) Test_AgentRegisters() {
publicId := "abc"
res, _ := s.agentRegisters(publicId, publicId)
s.Nil(res.err)
agentConn := res.agentConn
state := s.admin.CreateNotifification()
s.Equal(1, len(state.Agents))
s.Equal(0, len(state.Clients))
s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid])
// Now unregister
s.False(agentConn.CommChannel.Session.IsClosed())
s.Nil(s.admin.RemoveAgent(models.RendezVousId(publicId)))
s.True(agentConn.CommChannel.Session.IsClosed())
// copy on write, orioginal state is unchanged
s.Equal(1, len(state.Agents))
s.Equal(0, len(state.Clients))
state = s.admin.CreateNotifification()
s.Equal(0, len(state.Agents))
s.Equal(0, len(state.Clients))
}
func (s *AdminTestSuite) Test_ManyAgentsRegister() {
N := 10
agentRegistrations := make([]testsupport.TestFunction, N)
for i := range N {
publicId := fmt.Sprintf("abc%d", i)
agentRegistrations[i] = func() any {
res, _ := s.agentRegisters(publicId, publicId)
s.Nil(res.err)
return res.agentConn
}
}
res := testsupport.RunAndWait(
&s.Suite,
agentRegistrations...)
state := s.admin.CreateNotifification()
s.Equal(len(res), len(state.Agents))
s.Equal(0, len(state.Clients))
for _, entry := range res {
agentConn := entry.(*agentConnection)
s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid])
}
}
func (s *AdminTestSuite) Test_agentDuplicateId() {
publicId := "abc"
res, _ := s.agentRegisters(publicId, publicId)
s.Nil(res.err)
for i := range 100 {
res, _ = s.agentRegisters(publicId, fmt.Sprintf("%s-%d", publicId, i))
s.Nil(res.err)
}
res, agentSideResult := s.agentRegisters(publicId, "")
s.NotNil(res.err)
// verify it is the correct error and not an id mismatch.
s.True(strings.Contains(res.err.Error(), "could not allocate a new unique id"))
s.False(agentSideResult.registration.Ok)
}
func (s *AdminTestSuite) Test_connectClient() error {
publicId := "abc"
serverRes, agentRes := s.agentRegisters(publicId, publicId)
s.Nil(serverRes.err)
s.Nil(agentRes.err)
data := "connect client test msg"
clientConn, err := s.connectClientToAgent("singleclient", publicId, data, agentRes)
s.Nil(err)
if err != nil {
return err
}
// verify state
state := s.admin.CreateNotifification()
s.Equal(1, len(state.Agents))
s.Equal(1, len(state.Clients))
s.Equal(clientConn.Info, state.Clients[clientConn.Info.Guid])
// removing the client will close all connections, we test this by writing to the connections
// after removing the client.
err = s.admin.RemoveClient(clientConn)
s.Nil(err)
buf := make([]byte, 10)
_, err = clientConn.clientConnection.Write(buf)
s.NotNil(err)
s.True(strings.Contains(err.Error(), "closed"))
_, err = clientConn.agentConnection.Write(buf)
s.NotNil(err)
s.True(strings.Contains(err.Error(), "closed"))
return nil
}
func (s *AdminTestSuite) Test_MultipleAgentsAndClients() {
clientCounts := []int{10, 5, 37, 1, 29}
wg := sync.WaitGroup{}
for iagent, clientCount := range clientCounts {
wg.Add(1)
data := fmt.Sprintf("Agent test msg %d ", iagent)
go func() {
defer wg.Done()
publicId := fmt.Sprintf("abc%d", iagent)
serverRes, agentRes := s.agentRegisters(publicId, publicId)
s.Nil(serverRes.err)
s.Nil(agentRes.err)
for i := 0; i < clientCount; i++ {
// cannot yet create clients in parallel. Problem is that calling
// listener.Accept on the agent side can yield a connection from another
// client. The listener omehow needs to publish the connections that ere
// created in a map base on client id. The client can then retrieve the
// connection based on the client id and should also wait until the
// connection is available.
wg.Add(1)
go func() {
defer wg.Done()
iclient := i
client := fmt.Sprintf("client %d/%d", iagent, iclient)
_, err := s.connectClientToAgent(client, publicId, data, agentRes)
s.Nil(err)
}()
}
}()
}
wg.Wait()
}
func (s *AdminTestSuite) connectClientToAgent(
client string, publicId string, data string, agentRes AgentRegisterResult) (*clientConnection, error) {
serverToClientRW, clientToServerRW := testsupport.CreatePipe(s.ctx)
// TODO refactoring
// - TestAgentListener should run in a separate go routine
// Started by TestAgentSuite.
//
// TODO split up:
// 1. server: connects to agent, agent: listens for connections
// output: server: clientConnection with a.o. clientId
// agent: listener
// 2. communication check:
// server: use yamux connection to send message
// agent: retrieve connection from listener based on client id from clientConnection
// -> yamux connection
// exchange messages in two directions.
// 3. birectional communication
// full communication from client to agent through the converge server.
// Connect server to agent
res := testsupport.RunAndWait(
&s.Suite,
// Server: agent is already listening and accepts all connections and stores them based on clientId
func() any {
return s.connectClient(publicId, serverToClientRW)
})
// bidirectional communication check
clientConn := res[0].(*clientConnection)
s.NotNil(clientConn)
if clientConn == nil {
return nil, errors.New("Client connection is nil")
}
clientId := clientConn.Info.ClientId
// Retrieve the agent side connection for this client that was setup by the server
agentToServerYamux, err := s.clientConnection(clientId, agentRes.listener)
s.Nil(err)
if err != nil {
return nil, err
}
log.Println("Got agentToServerYamux")
serverToAgentYamux := clientConn.agentConnection
// Now first test the communication from server to agent over the just established connection
testsupport.RunAndWait(
&s.Suite,
func() any {
s.sendYamuxMsgServerToAgent(serverToAgentYamux, data)
return nil
},
func() any {
s.receiveYamuxMsgServerToAgent(agentToServerYamux, data)
return nil
})
// Synchronize data between client and agent through the server
go clientConn.Synchronize()
msg := fmt.Sprintf("end-to-end %s", client)
// verify bidirectional communication
testsupport.BidirectionalConnectionCheck(&s.Suite, msg, clientToServerRW, agentToServerYamux)
return clientConn, nil
}
func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() {
publicId := "abc"
serverRes, agentRes := s.agentRegisters(publicId, publicId)
s.Nil(serverRes.err)
s.Nil(agentRes.err)
serverToClientRW, _ := testsupport.CreatePipe(s.ctx)
_, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"),
iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
s.NotNil(err)
// verify state
state := s.admin.CreateNotifification()
s.Equal(1, len(state.Agents))
s.Equal(0, len(state.Clients))
}
// Registering an agent on the server
func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverToAgentRW io.ReadWriteCloser) (*agentConnection, error) {
agentConn, err := s.admin.AddAgent(
s.hostKey, models.RendezVousId(publicId), comms.EnvironmentInfo{},
serverToAgentRW)
if err != nil {
return nil, err
}
s.Equal(assignedPublicId, string(agentConn.Info.PublicId))
return agentConn, nil
}
// Agent activities registring on the server
func (s *AdminTestSuite) agentRegistration(agentToServerRW io.ReadWriteCloser) AgentRegisterResult {
// verify registration message received
agentRegistration, err := comms.ReceiveRegistrationMessage(agentToServerRW)
if err != nil {
return AgentRegisterResult{err: err}
}
commChannel, err := comms.NewCommChannel(comms.Agent, agentToServerRW)
if err != nil {
return AgentRegisterResult{registration: agentRegistration, err: err}
}
s.NotNil(commChannel)
baseListener := comms.NewAgentListener(commChannel.Session)
listener := testsupport.NewTestListener(s.ctx, baseListener)
return AgentRegisterResult{
registration: agentRegistration,
commChannel: commChannel,
listener: listener,
err: nil,
}
}
func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.ReadWriteCloser) any {
// server
clientConn, err := s.admin.AddClient(models.RendezVousId(publicId),
iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
s.Nil(err)
return clientConn
}
func (s *AdminTestSuite) clientConnection(clientId models.ClientId, listener *testsupport.TestAgentListener) (net.Conn, error) {
// agent
log.Printf("clientConnection: Getting connection for %v", clientId)
agentToServerYamux, err := listener.GetConnection(string(clientId))
log.Printf("clientConnection: Got connection %v for client %v", agentToServerYamux, clientId)
s.Nil(err)
return agentToServerYamux, err
}
func (s *AdminTestSuite) sendYamuxMsgServerToAgent(serverToAgentYamux io.Writer, data string) {
// server
testsupport.AssertWriteData(&s.Suite, data, serverToAgentYamux)
}
func (s *AdminTestSuite) receiveYamuxMsgServerToAgent(agentToServerYamux io.Reader, data string) {
// agent
testsupport.AssertReadData(&s.Suite, data, agentToServerYamux)
}