diff --git a/cmd/converge/prometheus.go b/cmd/converge/prometheus.go index 00f8033..98d78e6 100644 --- a/cmd/converge/prometheus.go +++ b/cmd/converge/prometheus.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "log" "net/http" + "time" ) const NAMESPACE = "converge" @@ -14,8 +15,8 @@ const NAMESPACE = "converge" var ( // remember previous values of agent guids and clients so that we can increment // the cumulative counters. - lastAgentGuids map[string]bool = make(map[string]bool) - lastClientGuids map[string]bool = make(map[string]bool) + lastAgents map[string]*models.Agent = make(map[string]*models.Agent) + lastClients map[string]*models.Client = make(map[string]*models.Client) cumulativeAgentCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: NAMESPACE, @@ -41,15 +42,26 @@ var ( agentStartTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: NAMESPACE, - Name: "agent_start_time_seconds", + Name: "agent_start_time_millis", Help: "Time the agent started", }, []string{"agent_guid"}) clientStartTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: NAMESPACE, - Name: "client_start_time_seconds", + Name: "client_start_time_millis", Help: "Time the client started", }, []string{"client_guid"}) + agentDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: NAMESPACE, + Name: "agent_duration_seconds", + Help: "Time the agent is already running", + }, []string{"agent_guid"}) + clientDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: NAMESPACE, + Name: "client_duration_seconds", + Help: "Time the client is already running", + }, []string{"client_guid"}) + agentInfo = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: NAMESPACE, @@ -114,10 +126,18 @@ func clientLabels(client models.Client) prometheus.Labels { } func agentActive(agent models.Agent) { + prevAgent := lastAgents[agent.Guid] + if prevAgent != nil && *prevAgent != agent { + removeAgentMetrics(prevAgent) + } agentInfo.With(agentLabels(agent)).Set(1) } func clientActive(client models.Client) { + prevClient := lastClients[client.Guid] + if prevClient != nil && *prevClient != client { + removeClientMetrics(prevClient) + } clientInfo.With(clientLabels(client)).Set(1) } @@ -131,43 +151,124 @@ func setupPrometheus(mux *http.ServeMux, notifications chan *models.State) { mux.Handle("/metrics", promhttp.Handler()) } +var prometheusChannel = make(chan func(), 10) + +// serialize notifidcations and periodi updates of the duration. + func updateMetrics(state *models.State) { + prometheusChannel <- func() { + updateMetricsImpl(state) + } +} + +func updateDurations() { + for _, agent := range lastAgents { + agentDuration. + With(prometheus.Labels{"agent_guid": agent.Guid}). + Set(float64(time.Now().Sub(agent.StartTime).Seconds())) + } + for _, client := range lastClients { + clientDuration. + With(prometheus.Labels{"client_guid": client.Guid}). + Set(float64(time.Now().Sub(client.StartTime).Seconds())) + } +} + +func init() { + go func() { + timer := time.NewTicker(1 * time.Second) + for { + select { + case <-timer.C: + prometheusChannel <- updateDurations + } + } + }() + + go func() { + for task := range prometheusChannel { + task() + } + }() +} + +func updateMetricsImpl(state *models.State) { // This implemnetation has a small probability that the metric will be in a partially // initialized state. This is however unlikely. It would lead to in incorrect determination // that an agent or client is not available. However, each agent and client will have a UID // so that is still possible to identify the client or agent even though some values might // become 0. - log.Printf("Got notification %v", *state) - - agentGuids := make(map[string]bool) - clientGuids := make(map[string]bool) + agentGuids := make(map[string]*models.Agent) + clientGuids := make(map[string]*models.Client) agentCount.Set(float64(len(state.Agents))) - agentInfo.Reset() - agentStartTime.Reset() + disconnectedAgents := make(map[string]*models.Agent) + for k, v := range lastAgents { + disconnectedAgents[k] = v + } for _, agent := range state.Agents { - if !lastAgentGuids[agent.Guid] { - agentStartTime. - With(prometheus.Labels{"agent_guid": agent.Guid}). - Set(float64(agent.StartTime.Unix())) + if lastAgents[agent.Guid] == nil { cumulativeAgentCount.Inc() } - agentGuids[agent.Guid] = true + delete(disconnectedAgents, agent.Guid) + agentGuids[agent.Guid] = &agent agentActive(agent) + agentStartTime. + With(prometheus.Labels{"agent_guid": agent.Guid}). + Set(float64(agent.StartTime.UnixMilli())) + agentDuration. + With(prometheus.Labels{"agent_guid": agent.Guid}). + Set(float64(time.Now().Sub(agent.StartTime).Seconds())) } - lastAgentGuids = agentGuids + for _, agent := range disconnectedAgents { + removeAgentMetrics(agent) + } + lastAgents = agentGuids clientCount.Set(float64(len(state.Clients))) - clientInfo.Reset() - clientStartTime.Reset() + + // with this app + disconnectedClients := make(map[string]*models.Client) + for k, v := range lastClients { + disconnectedClients[k] = v + } for _, client := range state.Clients { - if !lastClientGuids[client.Guid] { - clientStartTime.With(prometheus.Labels{"client_guid": client.Guid}).Set(float64(client.StartTime.Unix())) + if lastClients[client.Guid] == nil { cumulativeClientCount.Inc() } - clientGuids[client.Guid] = true + delete(disconnectedClients, client.Guid) + clientGuids[client.Guid] = &client clientActive(client) + clientStartTime. + With(prometheus.Labels{"client_guid": client.Guid}). + Set(float64(client.StartTime.UnixMilli())) + clientDuration. + With(prometheus.Labels{"client_guid": client.Guid}). + Set(float64(time.Now().Sub(client.StartTime).Seconds())) + } + for _, client := range disconnectedClients { + removeClientMetrics(client) + } + lastClients = clientGuids +} + +func removeAgentMetrics(agent *models.Agent) { + ok1 := agentInfo.Delete(agentLabels(*agent)) + guidLabels := prometheus.Labels{"agent_guid": agent.Guid} + ok2 := agentStartTime.Delete(guidLabels) + ok3 := agentDuration.Delete(guidLabels) + if !ok1 || !ok2 || !ok3 { + log.Printf("Could not delete all timeseries for agent %s", agent.Guid) + } +} + +func removeClientMetrics(client *models.Client) { + ok1 := clientInfo.Delete(clientLabels(*client)) + guidLabels := prometheus.Labels{"client_guid": client.Guid} + ok2 := clientStartTime.Delete(guidLabels) + ok3 := clientDuration.Delete(guidLabels) + if !ok1 || !ok2 || !ok3 { + log.Printf("Could not delete all timeseries for client %s", client.Guid) } - lastClientGuids = clientGuids } diff --git a/pkg/server/converge/matchmaker.go b/pkg/server/converge/matchmaker.go index fcde691..3cc22ae 100644 --- a/pkg/server/converge/matchmaker.go +++ b/pkg/server/converge/matchmaker.go @@ -86,7 +86,6 @@ func (converge *MatchMaker) Connect(wsProxyMode bool, publicId string, conn iowr } client, err := converge.admin.AddClient(publicId, conn) - converge.logStatus() if err != nil { if wsProxyMode { _ = comms.SendWithTimeout(channel, @@ -116,9 +115,8 @@ func (converge *MatchMaker) Connect(wsProxyMode bool, publicId string, conn iowr return fmt.Errorf("Error receiving environment info from client: %v", err) } client.EnvironmentInfo = clientEnvironment - converge.logStatus() } - + converge.logStatus() iowrappers2.SynchronizeStreams("client -- agent", client.ClientConnection, client.AgentConnection) return nil }