package main import ( "context" "fmt" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "log" "os" "slices" "strconv" "strings" ) type ServiceMonitor struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ServiceMonitorSpec `json:"spec"` } type ServiceMonitorSpec struct { NamespaceSelector NamespaceSelector `json:"namespaceSelector"` Selector metav1.LabelSelector `json:"selector"` Endpoints []Endpoint `json:"endpoints"` } type Endpoint struct { Path string `json:"path,omitempty"` Port string `json:"port"` TargetPort string `json:"targetPort"` Scheme string `json:"scheme,omitempty"` Relabelings []RelabelConfig `json:"relabelings,omitempty"` } type PodMonitor struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec PodMonitorSpec `json:"spec"` } type PodMonitorSpec struct { NamespaceSelector NamespaceSelector `json:"namespaceSelector"` Selector metav1.LabelSelector `json:"selector"` PodMetricsEndpoints []Endpoint `json:"podMetricsEndpoints"` } type NamespaceSelector struct { Any bool `json:"any,omitempty"` MatchNames []string `json:"matchNames,omitempty"` } type RelabelConfig struct { Action string `json:"action,omitempty"` SourceLabels []string `json:"sourceLabels,omitempty"` Separator string `json:"separator,omitempty"` TargetLabel string `json:"targetLabel,omitempty"` Regex string `json:"regex,omitempty"` Modulus uint64 `json:"modulus,omitempty"` Replacement string `json:"replacement,omitempty"` } type Cluster struct { namespaces map[string]v1.Namespace clientset *kubernetes.Clientset dynClient *dynamic.DynamicClient // map of namespace to list of all pods pods map[string][]v1.Pod services map[string][]v1.Service // map of namespace/podname ->: ScrapingEndpoints endpoints map[string]MonitoringEndpoints } type MonitoringEndpoint struct { Port string Path string } type MonitoringEndpoints []MonitoringEndpoint func (s MonitoringEndpoint) CompareTo(e MonitoringEndpoint) int { if res := strings.Compare(s.Port, e.Port); res != 0 { return res } return strings.Compare(s.Path, e.Path) } func (s *MonitoringEndpoints) Merge(endpoints MonitoringEndpoints) { *s = append(*s, endpoints...) slices.SortFunc(*s, func(a, b MonitoringEndpoint) int { return a.CompareTo(b) }) *s = slices.CompactFunc(*s, func(a, b MonitoringEndpoint) bool { return a.CompareTo(b) == 0 }) } func NewCluster(clientset *kubernetes.Clientset, dynClient *dynamic.DynamicClient) (*Cluster, error) { cluster := &Cluster{ clientset: clientset, dynClient: dynClient, namespaces: make(map[string]v1.Namespace), pods: make(map[string][]v1.Pod), services: make(map[string][]v1.Service), endpoints: make(map[string]MonitoringEndpoints), } nslist, err := cluster.clientset.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } fmt.Fprintf(os.Stderr, "Getting pods, services\n") for _, ns := range nslist.Items { cluster.namespaces[ns.Name] = ns } podList, err := cluster.clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } for _, pod := range podList.Items { cluster.pods[pod.Namespace] = append(cluster.pods[pod.Namespace], pod) } svcList, err := cluster.clientset.CoreV1().Services("").List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } for _, svc := range svcList.Items { cluster.services[svc.Namespace] = append(cluster.services[svc.Namespace], svc) } err = cluster.processServiceMonitors() if err != nil { return nil, err } err = cluster.procewssPodMonitors() if err != nil { return nil, err } podkeys := MapKeys(cluster.endpoints) slices.Sort(podkeys) for _, pod := range podkeys { monitors := cluster.endpoints[pod] slices.SortFunc(monitors, func(m1, m2 MonitoringEndpoint) int { return m1.CompareTo(m2) }) monitors = slices.CompactFunc(monitors, func(m1, m2 MonitoringEndpoint) bool { return m1.CompareTo(m2) == 0 }) cluster.endpoints[pod] = monitors fmt.Fprintf(os.Stderr, "monitoring endpoint: %s: %v\n", pod, monitors) } return cluster, nil } func (c *Cluster) processPodMonitor(podMonitor PodMonitor) error { selector, err := metav1.LabelSelectorAsSelector(&podMonitor.Spec.Selector) if err != nil { return err } namespaces := c.getNamespacesForMonitor(podMonitor.Spec.NamespaceSelector, podMonitor.Namespace) for _, endpoint := range podMonitor.Spec.PodMetricsEndpoints { for _, relabeling := range endpoint.Relabelings { if relabeling.Action != "keep" { continue } fmt.Fprintf(os.Stderr, "WARNING: podmonitor %s/%s contains relabeling which are currently not processed\n", podMonitor.Namespace, podMonitor.Name) } } for _, namespace := range namespaces { pods := slices.DeleteFunc(slices.Clone(c.pods[namespace]), func(pod v1.Pod) bool { return !selector.Matches(labels.Set(pod.Labels)) }) for _, pod := range pods { podkey := pod.Namespace + "/" + pod.Name klog.V(3).Infof(" match %s/%s\n", pod.Namespace, pod.Name) for _, container := range pod.Spec.Containers { for _, endpoint := range podMonitor.Spec.PodMetricsEndpoints { ep, err := c.getEndpointSpec(endpoint, &pod, &container, container.Ports) if err != nil { // TODO generate warning? // this container did not match continue } c.endpoints[podkey] = append(c.endpoints[podkey], ep...) } } if len(c.endpoints[podkey]) > 0 { klog.V(3).Infof(" POD %s/%s :%v\n", pod.Namespace, pod.Name, c.endpoints[podkey]) } } } return nil } func (c *Cluster) getNamespacesForMonitor(namespaceSelector NamespaceSelector, defaultNamespace string) []string { namespaces := MapKeys(c.namespaces) if !namespaceSelector.Any { namespaces = namespaceSelector.MatchNames if len(namespaces) == 0 { namespaces = []string{defaultNamespace} } } return namespaces } func (c *Cluster) processServiceMonitor(serviceMonitor ServiceMonitor) error { selector, err := metav1.LabelSelectorAsSelector(&serviceMonitor.Spec.Selector) if err != nil { return err } namespaces := c.getNamespacesForMonitor(serviceMonitor.Spec.NamespaceSelector, serviceMonitor.Namespace) for _, endpoint := range serviceMonitor.Spec.Endpoints { for _, relabeling := range endpoint.Relabelings { if relabeling.Action != "keep" { continue } fmt.Fprintf(os.Stderr, "WARNING: servicemonitor %s/%s contains relabeling which are currently not processed\n", serviceMonitor.Namespace, serviceMonitor.Name) } } for _, namespace := range namespaces { svcs := slices.DeleteFunc(slices.Clone(c.services[namespace]), func(svc v1.Service) bool { return !selector.Matches(labels.Set(svc.Labels)) }) for _, svc := range svcs { klog.V(3).Infof(" match %s/%s\n", svc.Namespace, svc.Name) if svc.Spec.Selector == nil { fmt.Fprintf(os.Stderr, " service does not have a selector, skippingit, probably configured using endpoints\n") continue } // Now we need to obtain the pod port. // 1. Port specified // service targetPort is used (equal to port if not specified) // a. lookup pods matched by the service using the service selector // b. path is /metics or the metric defined in the service selector // c. port, path is defined and poss are known. // 2. Target port specified // a, b, c see above. svcSelector := labels.SelectorFromSet(labels.Set(svc.Spec.Selector)) // find pods pods := slices.DeleteFunc(slices.Clone(c.pods[svc.Namespace]), func(pod v1.Pod) bool { return !svcSelector.Matches(labels.Set(pod.Labels)) }) for _, endpoint := range serviceMonitor.Spec.Endpoints { for _, pod := range pods { podkey := pod.Namespace + "/" + pod.Name for _, container := range pod.Spec.Containers { ep, err := c.getEndpointSpec(endpoint, &pod, &container, container.Ports) if err != nil { // TODO: generate warning? // this wervice did not match } c.endpoints[podkey] = append(c.endpoints[podkey], ep...) } if len(c.endpoints[podkey]) > 0 { klog.V(3).Infof(" POD %s/%s :%v\n", pod.Namespace, pod.Name, c.endpoints[podkey]) } } } } } return nil } type GeneralizedPort struct { Name string TargetPort string Protocol string } func MapServicePorts(ports []v1.ServicePort) []GeneralizedPort { return Map(ports, func(p v1.ServicePort) GeneralizedPort { return GeneralizedPort{ Name: p.Name, TargetPort: p.TargetPort.String(), Protocol: string(p.Protocol), } }) } func MapConteinerPorts(ports []v1.ContainerPort) []GeneralizedPort { return Map(ports, func(p v1.ContainerPort) GeneralizedPort { return GeneralizedPort{ Name: p.Name, TargetPort: strconv.Itoa(int(p.ContainerPort)), Protocol: string(p.Protocol), } }) } func (c *Cluster) getEndpointSpec(endpoint Endpoint, pod *v1.Pod, container *v1.Container, ports []v1.ContainerPort) ([]MonitoringEndpoint, error) { klog.V(4).Infof(" analyzing %s/%s container %s/%s %v\n", endpoint.Port, endpoint.TargetPort, pod.Name, container.Name, ports) res := []MonitoringEndpoint{} portLoop: for _, port := range ports { targetPort := "" switch { case endpoint.Port != "": //fmt.Fprintf(os.Stderr, "Checking %v %s\n", port.Port, endpoint.Port) if port.Name != endpoint.Port { continue } case endpoint.TargetPort != "": if strconv.Itoa(int(port.ContainerPort)) != endpoint.TargetPort { continue } } if targetPort == "" { if port.Name != "" { targetPort = port.Name } else { targetPort = strconv.Itoa(int(port.ContainerPort)) } } // check relabeling configs for _, relabeling := range endpoint.Relabelings { match, err := MatchRelabeling(relabeling, pod, container, &port) if err != nil { return nil, err } if !match { continue portLoop } } // port matches all criteria path := "/metrics" if endpoint.Path != "" { path = endpoint.Path } ep := MonitoringEndpoint{ Port: targetPort, Path: path, } res = append(res, ep) } return res, nil } func (c *Cluster) processServiceMonitors() error { fmt.Fprintf(os.Stderr, "Getting service monitors\n") serviceMonitorGVR := schema.GroupVersionResource{ Group: "monitoring.coreos.com", Version: "v1", Resource: "servicemonitors", } list, err := c.dynClient.Resource(serviceMonitorGVR). Namespace(""). List(context.Background(), metav1.ListOptions{}) if err != nil { return err } for _, item := range list.Items { var sm ServiceMonitor bytes, _ := json.Marshal(item.Object) if err := json.Unmarshal(bytes, &sm); err != nil { return fmt.Errorf("Error unmarshalling %s: %w", bytes, err) } klog.V(2).Infof("Found servicemonitor: %s/%s\n", sm.ObjectMeta.Namespace, sm.ObjectMeta.Name) if err := c.processServiceMonitor(sm); err != nil { return err } } return nil } func (c *Cluster) procewssPodMonitors() error { fmt.Fprintf(os.Stderr, "Getting pod monitors\n") podMonitorGVR := schema.GroupVersionResource{ Group: "monitoring.coreos.com", Version: "v1", Resource: "podmonitors", } list, err := c.dynClient.Resource(podMonitorGVR). Namespace(""). List(context.Background(), metav1.ListOptions{}) if err != nil { return err } for _, item := range list.Items { var pm PodMonitor bytes, _ := json.Marshal(item.Object) if err := json.Unmarshal(bytes, &pm); err != nil { return fmt.Errorf("Error unmarshalling %s: %w", bytes, err) } klog.V(2).Infof("Found podmonitor: %s/%s\n", pm.ObjectMeta.Namespace, pm.ObjectMeta.Name) if err := c.processPodMonitor(pm); err != nil { return err } } return nil } func (c *Cluster) Pods(application *Application) []v1.Pod { selector, err := metav1.LabelSelectorAsSelector(application.Selector()) if err != nil { log.Fatalf("Error creating selector: %v", err) } pods := c.pods[application.Namespace.Name] pods = slices.DeleteFunc(slices.Clone(pods), func(pod v1.Pod) bool { return !selector.Matches(labels.Set(pod.Labels)) }) return pods } func (c *Cluster) ServiceAccounts(application *Application) []string { var res []string for _, pod := range c.Pods(application) { if !slices.Contains(res, pod.Spec.ServiceAccountName) { res = append(res, pod.Spec.ServiceAccountName) } } return res } func (c *Cluster) OwnerReferences(application *Application) []string { var ownerReferences []string for _, pod := range c.Pods(application) { //log.Printf(" %s %v", pod.Name, pod.OwnerReferences) for _, ownerReference := range pod.OwnerReferences { owner := ownerReference.Kind + "/" + ownerReference.Name if !slices.Contains(ownerReferences, owner) { ownerReferences = append(ownerReferences, owner) } } } return ownerReferences } func (c *Cluster) IsLinkerdEnabled(application *Application) bool { pods := c.Pods(application) ndisabled := 0 for _, pod := range pods { if pod.Annotations["linkerd.io/inject"] == "enabled" { return true } if pod.Annotations["linkerd.io/inject"] == "disabled" { ndisabled++ } } if ndisabled == len(pods) { return false } ns := c.namespaces[application.Namespace.Name] return ns.Annotations["linkerd.io/inject"] == "enabled" } func (c *Cluster) NamespaceLIst() []v1.Namespace { return MapValues(c.namespaces) } func (c *Cluster) Namespace(name string) v1.Namespace { return c.namespaces[name] } func (c *Cluster) PodList(namespace string) []v1.Pod { return c.pods[namespace] } func (c *Cluster) PortNumbers(application *Application) []Port { if !c.IsLinkerdEnabled(application) { return nil } tcpPorts := make(map[int]Port) udpPorts := make(map[int]Port) for _, pod := range c.Pods(application) { for _, container := range pod.Spec.Containers { if container.Name == "linkerd-proxy" { continue } for _, port := range container.Ports { switch port.Protocol { case "TCP": tcpPorts[int(port.ContainerPort)] = Port{ Port: strconv.Itoa(int(port.ContainerPort)), Protocol: string(port.Protocol), } case "UDP": udpPorts[int(port.ContainerPort)] = Port{ Port: strconv.Itoa(int(port.ContainerPort)), Protocol: string(port.Protocol), } default: panic(fmt.Sprintf("Unknown port type for pod %s/%s: %s", pod.Namespace, pod.Name, port.Protocol)) } } } } res := MapValues(tcpPorts) res = append(res, MapValues(udpPorts)...) return res }