converge/pkg/comms/agentserver.go
Erik Brakkee d3cbf8388f Lots of refactoring.
Now hijacking the ssh connection setup in the listener to exchange some information before passing the connection on to the SSH server.

Next step is to do the full exchange of required information and to make it easy some simple Read and Write methods with timeouts are needed that use gob.
2024-09-08 11:16:49 +02:00

147 lines
3.1 KiB
Go

package comms
import (
"encoding/gob"
"fmt"
"github.com/hashicorp/yamux"
"io"
"log"
"time"
)
type CommChannel struct {
// a separet connection outside of the ssh session
SideChannel TCPChannel
Session *yamux.Session
}
type Role int
const (
Agent Role = iota
ConvergeServer
)
func NewCommChannel(role Role, wsConn io.ReadWriteCloser) (CommChannel, error) {
var commChannel CommChannel
switch role {
case Agent:
listener, err := yamux.Server(wsConn, nil)
if err != nil {
return CommChannel{}, err
}
commChannel = CommChannel{
Session: listener,
}
case ConvergeServer:
clientSession, err := yamux.Client(wsConn, nil)
if err != nil {
return CommChannel{}, err
}
commChannel = CommChannel{
Session: clientSession,
}
default:
panic(fmt.Errorf("Undefined role %d", role))
}
// communication between Agent and ConvergeServer
// Currently used only fof communication from Agent to ConvergeServer
switch role {
case Agent:
conn, err := commChannel.Session.OpenStream()
commChannel.SideChannel.Peer = conn
if err != nil {
return CommChannel{}, err
}
case ConvergeServer:
conn, err := commChannel.Session.Accept()
commChannel.SideChannel.Peer = conn
if err != nil {
return CommChannel{}, err
}
default:
panic(fmt.Errorf("Undefined role %d", role))
}
log.Println("Communication channel between agent and converge server established")
RegisterEventsWithGob()
commChannel.SideChannel.Encoder = gob.NewEncoder(commChannel.SideChannel.Peer)
commChannel.SideChannel.Decoder = gob.NewDecoder(commChannel.SideChannel.Peer)
// heartbeat
if role == Agent {
go func() {
for {
time.Sleep(10 * time.Second)
err := commChannel.SideChannel.Send(HeartBeat{})
if err != nil {
log.Println("Sending heartbeat to server failed")
}
}
}()
}
return commChannel, nil
}
// Sending an event to the other side
func ListenForAgentEvents(channel TCPChannel,
agentInfo func(agent AgentInfo),
sessionInfo func(session SessionInfo),
expiryTimeUpdate func(session ExpiryTimeUpdate)) {
for {
var result ConvergeMessage
err := channel.Decoder.Decode(&result)
if err != nil {
// TODO more clean solution, need to explicitly close when agent exits.
log.Printf("Exiting agent listener %v", err)
return
}
switch v := result.Value.(type) {
case AgentInfo:
agentInfo(v)
case SessionInfo:
sessionInfo(v)
case ExpiryTimeUpdate:
expiryTimeUpdate(v)
case HeartBeat:
// for not ignoring, can also implement behavior
// when heartbeat not received but hearbeat is only
// intended to keep the connection up
default:
fmt.Printf(" Unknown type: %T\n", v)
}
}
}
func ListenForServerEvents(channel CommChannel,
setUsernamePassword func(user UserPassword)) {
for {
var result ConvergeMessage
err := channel.SideChannel.Decoder.Decode(&result)
if err != nil {
// TODO more clean solution, need to explicitly close when agent exits.
log.Printf("Exiting agent listener %v", err)
return
}
switch v := result.Value.(type) {
case UserPassword:
setUsernamePassword(v)
default:
fmt.Printf(" Unknown type: %T\n", v)
}
}
}