test for ListenForAgentEvents implemented.
This commit is contained in:
parent
7b7827824e
commit
ea0b4282ba
@ -91,7 +91,8 @@ func SetupHeartBeat(commChannel CommChannel) {
|
|||||||
func ListenForAgentEvents(channel GOBChannel,
|
func ListenForAgentEvents(channel GOBChannel,
|
||||||
agentInfo func(agent EnvironmentInfo),
|
agentInfo func(agent EnvironmentInfo),
|
||||||
sessionInfo func(session SessionInfo),
|
sessionInfo func(session SessionInfo),
|
||||||
expiryTimeUpdate func(session ExpiryTimeUpdate)) {
|
expiryTimeUpdate func(session ExpiryTimeUpdate),
|
||||||
|
heartBeat func(heartbeat HeartBeat)) {
|
||||||
for {
|
for {
|
||||||
var result ConvergeMessage
|
var result ConvergeMessage
|
||||||
err := channel.Decoder.Decode(&result)
|
err := channel.Decoder.Decode(&result)
|
||||||
@ -115,6 +116,7 @@ func ListenForAgentEvents(channel GOBChannel,
|
|||||||
// for not ignoring, can also implement behavior
|
// for not ignoring, can also implement behavior
|
||||||
// when heartbeat not received but hearbeat is only
|
// when heartbeat not received but hearbeat is only
|
||||||
// intended to keep the connection up
|
// intended to keep the connection up
|
||||||
|
heartBeat(v)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
fmt.Printf(" Unknown type: %v %T\n", v, v)
|
fmt.Printf(" Unknown type: %v %T\n", v, v)
|
||||||
|
@ -165,6 +165,56 @@ func (s *AgentServerTestSuite) Test_Initialization() {
|
|||||||
|
|
||||||
func (s *AgentServerTestSuite) Test_ListenForAgentEvents() {
|
func (s *AgentServerTestSuite) Test_ListenForAgentEvents() {
|
||||||
|
|
||||||
|
agentEvents := []any{
|
||||||
|
NewEnvironmentInfo("myshell"),
|
||||||
|
NewSessionInfo("1", "sftp"),
|
||||||
|
NewExpiryTimeUpdate(time.Now().Add(1 * time.Minute)),
|
||||||
|
HeartBeat{},
|
||||||
|
}
|
||||||
|
const nevents = 100
|
||||||
|
eventTypesSent := make([]int, nevents, nevents)
|
||||||
|
testsupport.RunAndWait(
|
||||||
|
&s.Suite,
|
||||||
|
func() any {
|
||||||
|
channel := NewGOBChannel(s.agentConnection)
|
||||||
|
for i := range nevents {
|
||||||
|
ievent := rand.Int() % len(agentEvents)
|
||||||
|
eventTypesSent[i] = ievent
|
||||||
|
event := ConvergeMessage{
|
||||||
|
Value: agentEvents[ievent],
|
||||||
|
}
|
||||||
|
err := SendWithTimeout[ConvergeMessage](channel, event)
|
||||||
|
s.Nil(err)
|
||||||
|
}
|
||||||
|
// pending events will still be sent.
|
||||||
|
s.agentConnection.Close()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func() any {
|
||||||
|
eventTypesReceived := make([]int, nevents, nevents)
|
||||||
|
channel := NewGOBChannel(s.serverConnection)
|
||||||
|
i := 0
|
||||||
|
ListenForAgentEvents(channel,
|
||||||
|
func(agent EnvironmentInfo) {
|
||||||
|
eventTypesReceived[i] = 0
|
||||||
|
i++
|
||||||
|
},
|
||||||
|
func(session SessionInfo) {
|
||||||
|
eventTypesReceived[i] = 1
|
||||||
|
i++
|
||||||
|
},
|
||||||
|
func(expiryTimeUpdate ExpiryTimeUpdate) {
|
||||||
|
eventTypesReceived[i] = 2
|
||||||
|
i++
|
||||||
|
},
|
||||||
|
func(hearbeat HeartBeat) {
|
||||||
|
eventTypesReceived[i] = 3
|
||||||
|
i++
|
||||||
|
},
|
||||||
|
)
|
||||||
|
s.Equal(eventTypesSent, eventTypesReceived)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AgentServerTestSuite) Test_LIstenForServerEvents() {
|
func (s *AgentServerTestSuite) Test_LIstenForServerEvents() {
|
||||||
|
@ -67,7 +67,7 @@ type AgentRegistration struct {
|
|||||||
// Generic wrapper message required to send messages of arbitrary type
|
// Generic wrapper message required to send messages of arbitrary type
|
||||||
|
|
||||||
type ConvergeMessage struct {
|
type ConvergeMessage struct {
|
||||||
Value interface{}
|
Value any
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEnvironmentInfo(shell string) EnvironmentInfo {
|
func NewEnvironmentInfo(shell string) EnvironmentInfo {
|
||||||
|
@ -66,6 +66,9 @@ func (converge *MatchMaker) Register(publicId models.RendezVousId, conn io.ReadW
|
|||||||
func(expiry comms.ExpiryTimeUpdate) {
|
func(expiry comms.ExpiryTimeUpdate) {
|
||||||
agent.Info.SetExpiryTime(expiry.ExpiryTime)
|
agent.Info.SetExpiryTime(expiry.ExpiryTime)
|
||||||
converge.logStatus()
|
converge.logStatus()
|
||||||
|
},
|
||||||
|
func(heartbeat comms.HeartBeat) {
|
||||||
|
// Empty
|
||||||
})
|
})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
_ "runtime/pprof"
|
_ "runtime/pprof"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -30,6 +31,9 @@ func RunAndWait(suite *suite.Suite, functions ...TestFunction) []any {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func StartPprof(port string) *http.Server {
|
func StartPprof(port string) *http.Server {
|
||||||
|
if os.Getenv("PPROF") == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if port == "" {
|
if port == "" {
|
||||||
port = ":9000"
|
port = ":9000"
|
||||||
}
|
}
|
||||||
@ -49,6 +53,9 @@ func StartPprof(port string) *http.Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func StopPprof(ctx context.Context, server *http.Server) {
|
func StopPprof(ctx context.Context, server *http.Server) {
|
||||||
|
if os.Getenv("PPROF") == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
err := server.Shutdown(ctx)
|
err := server.Shutdown(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Error shutting down test pprof server")
|
log.Println("Error shutting down test pprof server")
|
||||||
|
Loading…
Reference in New Issue
Block a user