converge/cmd/converge/prometheus.go

308 lines
8.8 KiB
Go

package main
import (
"git.wamblee.org/converge/pkg/models"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
"time"
)
const NAMESPACE = "converge"
var (
// remember previous values of agent guids and clients so that we can increment
// the cumulative counters.
lastAgents map[string]models.Agent = make(map[string]models.Agent)
lastClients map[string]models.Client = make(map[string]models.Client)
cumulativeAgentCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: NAMESPACE,
Name: "agent_count_total",
Help: "Total number of agents connected over time",
})
cumulativeClientCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: NAMESPACE,
Name: "client_count_total",
Help: "Total number of clients connected over time",
})
agentCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "agent_count",
Help: "Current number of agents",
})
clientCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "client_count",
Help: "Current number of clients",
})
agentStartTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "agent_start_time_millis",
Help: "Time the agent started",
}, []string{"agent_guid"})
clientStartTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "client_start_time_millis",
Help: "Time the client started",
}, []string{"client_guid"})
agentDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "agent_duration_seconds",
Help: "Time the agent is already running",
}, []string{"agent_guid"})
clientDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "client_duration_seconds",
Help: "Time the client is already running",
}, []string{"client_guid"})
agentInfo = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "agent_info",
Help: "A flexible gauge with dynamic labels, always set to 1",
},
[]string{
"agent_guid",
"agent_address",
"agent_id",
"agent_username",
"agent_hostname",
"agent_pwd",
"agent_os",
"agent_shell",
})
clientInfo = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: NAMESPACE,
Name: "client_info",
Help: "A flexible gauge with dynamic labels, always set to 1",
},
[]string{"client_guid",
"client_address",
"client_id",
"agent_id",
"agent_guid",
"client_sessiontype",
"client_username",
"client_hostname",
"client_pwd",
"client_os",
"client_shell",
}, // Label names
)
)
func agentLabels(agent models.Agent) prometheus.Labels {
return prometheus.Labels{
"agent_guid": agent.Guid,
"agent_address": agent.RemoteAddr,
"agent_id": agent.PublicId,
"agent_username": agent.EnvironmentInfo.Username,
"agent_hostname": agent.EnvironmentInfo.Hostname,
"agent_pwd": agent.EnvironmentInfo.Pwd,
"agent_os": agent.EnvironmentInfo.OS,
"agent_shell": agent.EnvironmentInfo.Shell,
}
}
func clientLabels(client models.Client) prometheus.Labels {
return prometheus.Labels{
"client_guid": client.Guid,
"client_address": client.RemoteAddr,
"client_id": client.ClientId,
"agent_id": client.PublicId,
"agent_guid": client.AgentGuid,
"client_sessiontype": client.SessionType,
"client_username": client.EnvironmentInfo.Username,
"client_hostname": client.EnvironmentInfo.Hostname,
"client_pwd": client.EnvironmentInfo.Pwd,
"client_os": client.EnvironmentInfo.OS,
"client_shell": client.EnvironmentInfo.Shell,
}
}
func agentActive(agent models.Agent) {
prevAgent, ok := lastAgents[agent.Guid]
if ok && prevAgent != agent {
removeAgentInfoMetrics(prevAgent)
}
agentInfo.With(agentLabels(agent)).Set(1)
agentStartTime.
With(prometheus.Labels{"agent_guid": agent.Guid}).
Set(float64(agent.StartTime.UnixMilli()))
agentDuration.
With(prometheus.Labels{"agent_guid": agent.Guid}).
Set(float64(time.Now().Sub(agent.StartTime).Seconds()))
}
func clientActive(client models.Client) {
prevClient, ok := lastClients[client.Guid]
if ok && prevClient != client {
removeClientInfoMetrics(prevClient)
}
clientInfo.With(clientLabels(client)).Set(1)
clientStartTime.
With(prometheus.Labels{"client_guid": client.Guid}).
Set(float64(client.StartTime.UnixMilli()))
clientDuration.
With(prometheus.Labels{"client_guid": client.Guid}).
Set(float64(time.Now().Sub(client.StartTime).Seconds()))
}
func setupPrometheus(mux *http.ServeMux, notifications chan *models.State) {
// go routine that handles incoming events so we don't need to serialize access in some
// other way.
go func() {
for task := range prometheusChannel {
task()
}
}()
// send an event periodically to update the agent and client durations so
// prometheus gets accurate values.
go func() {
timer := time.NewTicker(1 * time.Second)
for {
select {
case <-timer.C:
prometheusChannel <- updateDurations
}
}
}()
// process incoming notifications from converge to update metrics.
go func() {
for {
state := <-notifications
updateMetrics(state)
}
}()
// expose prometheus on a separate port.
mux.Handle("/metrics", promhttp.Handler())
}
var prometheusChannel = make(chan func(), 10)
// serialize notifidcations and periodi updates of the duration.
func updateMetrics(state *models.State) {
prometheusChannel <- func() {
updateMetricsImpl(state)
}
}
func updateDurations() {
for _, agent := range lastAgents {
agentDuration.
With(prometheus.Labels{"agent_guid": agent.Guid}).
Set(float64(time.Now().Sub(agent.StartTime).Seconds()))
}
for _, client := range lastClients {
clientDuration.
With(prometheus.Labels{"client_guid": client.Guid}).
Set(float64(time.Now().Sub(client.StartTime).Seconds()))
}
}
func updateMetricsImpl(state *models.State) {
// This implemnetation has a small probability that the metric will be in a partially
// initialized state. This is however unlikely. It would lead to in incorrect determination
// that an agent or client is not available. However, each agent and client will have a UID
// so that is still possible to identify the client or agent even though some values might
// become 0.
agentGuids := make(map[string]models.Agent)
clientGuids := make(map[string]models.Client)
agentCount.Set(float64(len(state.Agents)))
disconnectedAgents := make(map[string]models.Agent)
for k, v := range lastAgents {
disconnectedAgents[k] = v
}
for _, agent := range state.Agents {
if _, ok := lastAgents[agent.Guid]; !ok {
cumulativeAgentCount.Inc()
}
delete(disconnectedAgents, agent.Guid)
agentGuids[agent.Guid] = agent
agentActive(agent)
}
for _, agent := range disconnectedAgents {
removeAgentMetrics(agent)
}
lastAgents = agentGuids
clientCount.Set(float64(len(state.Clients)))
// with this app
disconnectedClients := make(map[string]models.Client)
for k, v := range lastClients {
disconnectedClients[k] = v
}
for _, client := range state.Clients {
if _, ok := lastClients[client.Guid]; !ok {
cumulativeClientCount.Inc()
}
delete(disconnectedClients, client.Guid)
clientGuids[client.Guid] = client
clientActive(client)
}
for _, client := range disconnectedClients {
removeClientMetrics(client)
}
lastClients = clientGuids
}
func removeAgentInfoMetrics(agent models.Agent) bool {
return agentInfo.Delete(agentLabels(agent))
}
func removeAgentMetrics(agent models.Agent) {
ok1 := removeAgentInfoMetrics(agent)
guidLabels := prometheus.Labels{"agent_guid": agent.Guid}
ok2 := agentStartTime.Delete(guidLabels)
// delayed deletion of the duration sow we are sure the prometheus has the last data.
go func() {
time.Sleep(60 * time.Second)
ok := agentDuration.Delete(guidLabels)
if !ok {
log.Printf("Could not delete duration timeseries for agent %s", agent.Guid)
}
}()
if !ok1 || !ok2 {
log.Printf("Could not delete all timeseries for agent %s (info %v, starttime %v) ",
agent.Guid, ok1, ok2)
}
}
func removeClientInfoMetrics(client models.Client) bool {
return clientInfo.Delete(clientLabels(client))
}
func removeClientMetrics(client models.Client) {
ok1 := removeClientInfoMetrics(client)
guidLabels := prometheus.Labels{"client_guid": client.Guid}
ok2 := clientStartTime.Delete(guidLabels)
// delayed deletion of the duration sow we are sure the prometheus has the last data.
go func() {
time.Sleep(60 * time.Second)
ok := clientDuration.Delete(guidLabels)
if !ok {
log.Printf("Could not delete duration timeseries for client %s", client.Guid)
}
}()
if !ok1 || !ok2 {
log.Printf("Could not delete all timeseries for client %s (info %v, starttime %v)", client.Guid, ok1, ok2)
}
}