converge/cmd/converge/server.go
Erik Brakkee cfccf04f9d working server
* administration appears coorect
* multiple clients for one agent
* logging of active connections
* simple echo server on the agent.
2024-07-20 13:35:49 +02:00

271 lines
6.7 KiB
Go

package main
import (
"cidebug/pkg/iowrappers"
"fmt"
"github.com/gorilla/websocket"
"github.com/hashicorp/yamux"
"io"
"log"
"net"
"net/http"
"regexp"
"sync"
"time"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func handleWebSocket(w http.ResponseWriter, r *http.Request,
handler func(w http.ResponseWriter, r *http.Request, websockerConnection iowrappers.ReadWriteAddrCloser)) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Error upgrading to WebSocket:", err)
return
}
wsConn := iowrappers.NewWebSocketConn(conn)
defer wsConn.Close()
handler(w, r, wsConn)
}
type WebSocketService struct {
handler func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser)
}
func (endpoint *WebSocketService) handle(w http.ResponseWriter, r *http.Request) {
handleWebSocket(w, r, endpoint.handler)
}
type Agent struct {
// server session
clientSession *yamux.Session
publicId string
startTime time.Time
}
type Client struct {
publicId string
agent net.Conn
client iowrappers.ReadWriteAddrCloser
startTime time.Time
}
func NewAgent(publicId string,
agentSession *yamux.Session) *Agent {
return &Agent{
clientSession: agentSession,
publicId: publicId,
startTime: time.Now(),
}
}
func NewClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser, agentConn net.Conn) *Client {
return &Client{
publicId: publicId,
agent: agentConn,
client: clientConn,
startTime: time.Now(),
}
}
type Admin struct {
// map of public id to agent
mutex sync.Mutex
agents map[string]*Agent
clients []*Client
}
func NewAdmin() *Admin {
admin := Admin{
mutex: sync.Mutex{},
agents: make(map[string]*Agent),
clients: make([]*Client, 0), // not strictly needed
}
return &admin
}
func (admin *Admin) logStatus() {
log.Printf("%-20s %-20s %-20s\n", "AGENT", "ACTIVE_SINCE", "REMOTE_ADDRESS")
for _, agent := range admin.agents {
agent.clientSession.RemoteAddr()
log.Printf("%-20s %-20s %-20s\n", agent.publicId,
agent.startTime.Format("2006-01-02 15:04:05"),
agent.clientSession.RemoteAddr().String())
}
log.Println("")
log.Printf("%-20s %-20s %-20s\n", "CLIENT", "ACTIVE_SINCE", "REMOTE_ADDRESS")
for _, client := range admin.clients {
log.Printf("%-20s %-20s %-20s", client.publicId,
client.startTime.Format("2006-01-02 15:04:05"),
client.client.RemoteAddr())
}
log.Printf("\n")
}
func (admin *Admin) addAgent(publicId string, conn io.ReadWriteCloser) (*Agent, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent != nil {
return nil, fmt.Errorf("A different agent with same publicId '%s' already registered", publicId)
}
session, err := yamux.Client(conn, nil)
if err != nil {
return nil, err
}
agent = NewAgent(publicId, session)
admin.agents[publicId] = agent
admin.logStatus()
return agent, nil
}
func (admin *Admin) addClient(publicId string, clientConn iowrappers.ReadWriteAddrCloser) (*Client, error) {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
// we should setup on-demend connections ot agents later.
return nil, fmt.Errorf("No agent found for publicId '%s'", publicId)
}
agentConn, err := agent.clientSession.Open()
if err != nil {
return nil, err
}
client := NewClient(publicId, clientConn, agentConn)
admin.clients = append(admin.clients, client)
admin.logStatus()
return client, nil
}
func (admin *Admin) RemoveAgent(publicId string) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
agent := admin.agents[publicId]
if agent == nil {
return fmt.Errorf("Cannot remove agent: '%s' not found", publicId)
}
log.Printf("Removing agent: '%s'", publicId)
err := agent.clientSession.Close()
if err != nil {
log.Printf("Could not close yamux client session for '%s'\n", publicId)
}
delete(admin.agents, publicId)
admin.logStatus()
return nil
}
func (admin *Admin) RemoveClient(client *Client) error {
admin.mutex.Lock()
defer admin.mutex.Unlock()
log.Printf("Removing client: '%s' created at %s\n", client.publicId,
client.startTime.Format("2006-01-02 15:04:05"))
// try to explicitly close connection to the agent.
_ = client.agent.Close()
_ = client.client.Close()
for i, _client := range admin.clients {
if _client == _client {
admin.clients = append(admin.clients[:i], admin.clients[i+1:]...)
break
}
}
admin.logStatus()
return nil
}
func (admin *Admin) Register(publicId string, conn io.ReadWriteCloser) error {
defer conn.Close()
// TODO: remove agent return value
agent, err := admin.addAgent(publicId, conn)
if err != nil {
return err
}
defer func() {
admin.RemoveAgent(publicId)
}()
log.Printf("Agent registered: '%s'\n", publicId)
for !agent.clientSession.IsClosed() {
time.Sleep(1 * time.Second)
}
return nil
}
func (admin *Admin) Connect(publicId string, conn iowrappers.ReadWriteAddrCloser) error {
defer conn.Close()
client, err := admin.addClient(publicId, conn)
if err != nil {
return err
}
defer func() {
admin.RemoveClient(client)
}()
log.Printf("Connecting client and agent: '%s'\n", publicId)
iowrappers.SynchronizeStreams(client.client, client.agent)
return nil
}
func (admin *Admin) log() {
log.Println("CONNECTIONS")
for _, agent := range admin.agents {
log.Println(agent.publicId)
}
log.Printf("\n")
}
func parsePublicId(path string) (publicId string, _ error) {
pattern := regexp.MustCompile("^/[^/]+/([^/]+)$")
matches := pattern.FindStringSubmatch(path)
if len(matches) != 2 {
return "", fmt.Errorf("Invalid URL path '%s'", path)
}
return matches[1], nil
}
func main() {
admin := NewAdmin()
registrationService := WebSocketService{
handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) {
publicId, err := parsePublicId(r.URL.Path)
if err != nil {
log.Printf("Cannot parse public id from url: '%v'\n", err)
return
}
log.Printf("Got registration connection: '%s'\n", publicId)
err = admin.Register(publicId, conn)
if err != nil {
log.Printf("Error %v\n", err)
}
},
}
clientService := WebSocketService{
handler: func(w http.ResponseWriter, r *http.Request, conn iowrappers.ReadWriteAddrCloser) {
publicId, err := parsePublicId(r.URL.Path)
if err != nil {
log.Printf("Cannot parse public id from url: '%v'\n", err)
return
}
log.Printf("Got client connection: '%s'\n", publicId)
err = admin.Connect(publicId, conn)
if err != nil {
log.Printf("Error %v\n", err)
}
},
}
http.HandleFunc("/agent/", registrationService.handle)
http.HandleFunc("/client/", clientService.handle)
// Start HTTP server
fmt.Println("WebSocket server listening on :8000")
log.Fatal(http.ListenAndServe(":8000", nil))
}