converge/pkg/server/admin/admin_test.go

294 lines
8.3 KiB
Go

package admin
import (
"context"
"crypto/rand"
"fmt"
"git.wamblee.org/converge/pkg/comms"
"git.wamblee.org/converge/pkg/models"
"git.wamblee.org/converge/pkg/support/iowrappers"
"git.wamblee.org/converge/pkg/testsupport"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
"io"
"net/http"
"strings"
"testing"
"time"
)
// test case
//
// Overall:
// - Connect agent, connect 2 clients
// - Connect multiple agents and clients
type AdminTestSuite struct {
suite.Suite
ctx context.Context
cancelFunc context.CancelFunc
pprofServer *http.Server
admin *Admin
hostKey []byte
}
func (s *AdminTestSuite) createPipe() (io.ReadWriteCloser, io.ReadWriteCloser) {
bitpipe := testsupport.NewInmemoryConnection(s.ctx, "inmemory", 10)
return bitpipe.Front(), bitpipe.Back()
}
func (s *AdminTestSuite) SetupSuite() {
s.pprofServer = testsupport.StartPprof("")
}
func (s *AdminTestSuite) TearDownSuite() {
testsupport.StopPprof(s.ctx, s.pprofServer)
}
func (s *AdminTestSuite) SetupTest() {
ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
s.ctx = ctx
s.cancelFunc = cancelFunc
s.admin = NewAdmin()
s.hostKey = make([]byte, 100)
s.NotNil(rand.Read(s.hostKey))
}
func (s *AdminTestSuite) TearDownTest() {
s.admin.Close()
s.cancelFunc()
goleak.VerifyNone(s.T())
}
func TestAdminTestSuite(t *testing.T) {
suite.Run(t, &AdminTestSuite{})
}
type AddAgentResult struct {
agentConn *agentConnection
err error
}
type AgentRegisterResult struct {
registration comms.AgentRegistration
commChannel comms.CommChannel
err error
}
func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) {
agentRW, serverRW := s.createPipe()
res := testsupport.RunAndWait(
&s.Suite,
func() any {
agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverRW)
return AddAgentResult{
agentConn: agentConn,
err: err,
}
},
func() any {
res := s.agentRegistration(agentRW)
if assignedPublicId != "" {
s.Nil(res.err)
s.True(res.registration.Ok)
s.Equal(s.hostKey, res.registration.HostPrivateKey)
}
return res
})
return res[0].(AddAgentResult), res[1].(AgentRegisterResult)
}
func (s *AdminTestSuite) Test_AgentRegisters() {
publicId := "abc"
res, _ := s.agentRegisters(publicId, publicId)
s.Nil(res.err)
agentConn := res.agentConn
state := s.admin.CreateNotifification()
s.Equal(1, len(state.Agents))
s.Equal(0, len(state.Clients))
s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid])
// Now unregister
s.False(agentConn.CommChannel.Session.IsClosed())
s.admin.RemoveAgent(models.RendezVousId(publicId))
s.True(agentConn.CommChannel.Session.IsClosed())
// copy on write, orioginal state is unchanged
s.Equal(1, len(state.Agents))
s.Equal(0, len(state.Clients))
state = s.admin.CreateNotifification()
s.Equal(0, len(state.Agents))
s.Equal(0, len(state.Clients))
}
func (s *AdminTestSuite) Test_ManyAgentsRegister() {
N := 10
agentRegistrations := make([]testsupport.TestFunction, N)
for i := range N {
publicId := fmt.Sprintf("abc%d", i)
agentRegistrations[i] = func() any {
res, _ := s.agentRegisters(publicId, publicId)
s.Nil(res.err)
return res.agentConn
}
}
res := testsupport.RunAndWait(
&s.Suite,
agentRegistrations...)
state := s.admin.CreateNotifification()
s.Equal(len(res), len(state.Agents))
s.Equal(0, len(state.Clients))
for _, entry := range res {
agentConn := entry.(*agentConnection)
s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid])
}
}
func (s *AdminTestSuite) Test_agentDuplicateId() {
res, _ := s.agentRegisters("abc", "abc")
s.Nil(res.err)
for i := range 100 {
res, _ = s.agentRegisters("abc", fmt.Sprintf("abc-%d", i))
s.Nil(res.err)
}
res, agentSideResult := s.agentRegisters("abc", "")
s.NotNil(res.err)
// verify it is the correct error and not an id mismatch.
s.True(strings.Contains(res.err.Error(), "could not allocate a new unique id"))
s.False(agentSideResult.registration.Ok)
}
func (s *AdminTestSuite) Test_connectClient() {
publicId := "abc"
serverRes, agentRes := s.agentRegisters(publicId, "abc")
s.Nil(serverRes.err)
s.Nil(agentRes.err)
serverToClientRW, clientToServerRW := s.createPipe()
data := "connect client test msg"
res := testsupport.RunAndWait(
&s.Suite,
func() any {
return s.connectClient(publicId, serverToClientRW, data)
},
func() any {
return s.clientConnection(agentRes, data)
})
// bidirectional communciataion check
clientConn := res[0].(*clientConnection)
agentToServerYamux := res[1].(io.ReadWriter)
go clientConn.Synchronize()
s.bidirectionalConnectionCheck(clientToServerRW, agentToServerYamux)
// verify state
state := s.admin.CreateNotifification()
s.Equal(1, len(state.Agents))
s.Equal(1, len(state.Clients))
s.Equal(clientConn.Info, state.Clients[clientConn.Info.Guid])
// removing the client will close all connections, we test this by writing to the connections
// after removing the client.
s.admin.RemoveClient(clientConn)
buf := make([]byte, 10)
_, err := clientConn.clientConnection.Write(buf)
s.NotNil(err)
s.True(strings.Contains(err.Error(), "closed"))
_, err = clientConn.agentConnection.Write(buf)
s.NotNil(err)
s.True(strings.Contains(err.Error(), "closed"))
}
func (s *AdminTestSuite) Test_multipleAgentsAndClients() {
}
func (s *AdminTestSuite) bidirectionalConnectionCheck(clientToServerRW io.ReadWriteCloser, agentToServerYamux io.ReadWriter) {
data1 := "mytestdata"
data2 := "mytestdata-2"
testsupport.RunAndWait(
&s.Suite,
func() any {
testsupport.AssertWriteData(&s.Suite, data1, clientToServerRW)
testsupport.AssertReadData(&s.Suite, data2, agentToServerYamux)
return nil
},
func() any {
testsupport.AssertReadData(&s.Suite, data1, agentToServerYamux)
testsupport.AssertWriteData(&s.Suite, data2, clientToServerRW)
return nil
})
}
func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() {
publicId := "abc"
serverRes, agentRes := s.agentRegisters(publicId, "abc")
s.Nil(serverRes.err)
s.Nil(agentRes.err)
serverToClientRW, _ := s.createPipe()
_, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"),
iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
s.NotNil(err)
// verify state
state := s.admin.CreateNotifification()
s.Equal(1, len(state.Agents))
s.Equal(0, len(state.Clients))
}
// Registering an agent on the server
func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverRW io.ReadWriteCloser) (*agentConnection, error) {
agentConn, err := s.admin.AddAgent(
s.hostKey, models.RendezVousId(publicId), comms.EnvironmentInfo{},
serverRW)
if err != nil {
return nil, err
}
s.Equal(assignedPublicId, string(agentConn.Info.PublicId))
return agentConn, nil
}
// Agent activities registring on the server
func (s *AdminTestSuite) agentRegistration(agentRW io.ReadWriteCloser) AgentRegisterResult {
// verify registration message received
agentRegistration, err := comms.ReceiveRegistrationMessage(agentRW)
if err != nil {
return AgentRegisterResult{err: err}
}
commChannel, err := comms.NewCommChannel(comms.Agent, agentRW)
if err != nil {
return AgentRegisterResult{registration: agentRegistration, err: err}
}
s.NotNil(commChannel)
return AgentRegisterResult{registration: agentRegistration, commChannel: commChannel, err: nil}
}
func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.ReadWriteCloser, data string) any {
// server
clientConn, err := s.admin.AddClient(models.RendezVousId(publicId),
iowrappers.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
s.Nil(err)
// Connection to agent over yamux
serverToAgentYamux := clientConn.agentConnection
// test by sending a message to the agent.
testsupport.AssertWriteData(&s.Suite, data, serverToAgentYamux)
return clientConn
}
func (s *AdminTestSuite) clientConnection(agentRes AgentRegisterResult, data string) any {
// agent
listener := comms.NewAgentListener(agentRes.commChannel.Session)
//.Connection from server over yamux
agentToServerYamux, err := listener.Accept()
s.Nil(err)
// Test by receiving a message from the server
testsupport.AssertReadData(&s.Suite, data, agentToServerYamux)
return agentToServerYamux
}