514 lines
15 KiB
Go
514 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/json"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/klog/v2"
|
|
"log"
|
|
"os"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
type ServiceMonitor struct {
|
|
metav1.TypeMeta `json:",inline"`
|
|
metav1.ObjectMeta `json:"metadata,omitempty"`
|
|
Spec ServiceMonitorSpec `json:"spec"`
|
|
}
|
|
|
|
type ServiceMonitorSpec struct {
|
|
NamespaceSelector NamespaceSelector `json:"namespaceSelector"`
|
|
Selector metav1.LabelSelector `json:"selector"`
|
|
Endpoints []Endpoint `json:"endpoints"`
|
|
}
|
|
|
|
type Endpoint struct {
|
|
Path string `json:"path,omitempty"`
|
|
Port string `json:"port"`
|
|
TargetPort string `json:"targetPort"`
|
|
Scheme string `json:"scheme,omitempty"`
|
|
Relabelings []RelabelConfig `json:"relabelings,omitempty"`
|
|
}
|
|
|
|
type PodMonitor struct {
|
|
metav1.TypeMeta `json:",inline"`
|
|
metav1.ObjectMeta `json:"metadata,omitempty"`
|
|
Spec PodMonitorSpec `json:"spec"`
|
|
}
|
|
|
|
type PodMonitorSpec struct {
|
|
NamespaceSelector NamespaceSelector `json:"namespaceSelector"`
|
|
Selector metav1.LabelSelector `json:"selector"`
|
|
PodMetricsEndpoints []Endpoint `json:"podMetricsEndpoints"`
|
|
}
|
|
|
|
type NamespaceSelector struct {
|
|
Any bool `json:"any,omitempty"`
|
|
MatchNames []string `json:"matchNames,omitempty"`
|
|
}
|
|
|
|
type RelabelConfig struct {
|
|
Action string `json:"action,omitempty"`
|
|
SourceLabels []string `json:"sourceLabels,omitempty"`
|
|
Separator string `json:"separator,omitempty"`
|
|
TargetLabel string `json:"targetLabel,omitempty"`
|
|
Regex string `json:"regex,omitempty"`
|
|
Modulus uint64 `json:"modulus,omitempty"`
|
|
Replacement string `json:"replacement,omitempty"`
|
|
}
|
|
|
|
type Cluster struct {
|
|
namespaces map[string]v1.Namespace
|
|
clientset *kubernetes.Clientset
|
|
dynClient *dynamic.DynamicClient
|
|
// map of namespace to list of all pods
|
|
pods map[string][]v1.Pod
|
|
services map[string][]v1.Service
|
|
|
|
// map of namespace/podname ->: ScrapingEndpoints
|
|
endpoints map[string]MonitoringEndpoints
|
|
}
|
|
|
|
type MonitoringEndpoint struct {
|
|
Port string
|
|
Path string
|
|
}
|
|
|
|
type MonitoringEndpoints []MonitoringEndpoint
|
|
|
|
func (s MonitoringEndpoint) CompareTo(e MonitoringEndpoint) int {
|
|
if res := strings.Compare(s.Port, e.Port); res != 0 {
|
|
return res
|
|
}
|
|
return strings.Compare(s.Path, e.Path)
|
|
}
|
|
|
|
func (s *MonitoringEndpoints) Merge(endpoints MonitoringEndpoints) {
|
|
*s = append(*s, endpoints...)
|
|
slices.SortFunc(*s, func(a, b MonitoringEndpoint) int {
|
|
return a.CompareTo(b)
|
|
})
|
|
*s = slices.CompactFunc(*s, func(a, b MonitoringEndpoint) bool {
|
|
return a.CompareTo(b) == 0
|
|
})
|
|
}
|
|
|
|
func NewCluster(clientset *kubernetes.Clientset, dynClient *dynamic.DynamicClient) (*Cluster, error) {
|
|
cluster := &Cluster{
|
|
clientset: clientset,
|
|
dynClient: dynClient,
|
|
|
|
namespaces: make(map[string]v1.Namespace),
|
|
pods: make(map[string][]v1.Pod),
|
|
services: make(map[string][]v1.Service),
|
|
endpoints: make(map[string]MonitoringEndpoints),
|
|
}
|
|
nslist, err := cluster.clientset.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fmt.Fprintf(os.Stderr, "Getting pods, services\n")
|
|
for _, ns := range nslist.Items {
|
|
cluster.namespaces[ns.Name] = ns
|
|
}
|
|
podList, err := cluster.clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range podList.Items {
|
|
cluster.pods[pod.Namespace] = append(cluster.pods[pod.Namespace], pod)
|
|
}
|
|
|
|
svcList, err := cluster.clientset.CoreV1().Services("").List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, svc := range svcList.Items {
|
|
cluster.services[svc.Namespace] = append(cluster.services[svc.Namespace], svc)
|
|
}
|
|
err = cluster.processServiceMonitors()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = cluster.procewssPodMonitors()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
podkeys := MapKeys(cluster.endpoints)
|
|
slices.Sort(podkeys)
|
|
for _, pod := range podkeys {
|
|
monitors := cluster.endpoints[pod]
|
|
slices.SortFunc(monitors,
|
|
func(m1, m2 MonitoringEndpoint) int {
|
|
return m1.CompareTo(m2)
|
|
})
|
|
monitors = slices.CompactFunc(monitors,
|
|
func(m1, m2 MonitoringEndpoint) bool {
|
|
return m1.CompareTo(m2) == 0
|
|
})
|
|
cluster.endpoints[pod] = monitors
|
|
fmt.Fprintf(os.Stderr, "monitoring endpoint: %s: %v\n", pod, monitors)
|
|
}
|
|
|
|
return cluster, nil
|
|
}
|
|
|
|
func (c *Cluster) processPodMonitor(podMonitor PodMonitor) error {
|
|
selector, err := metav1.LabelSelectorAsSelector(&podMonitor.Spec.Selector)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
namespaces := c.getNamespacesForMonitor(podMonitor.Spec.NamespaceSelector, podMonitor.Namespace)
|
|
for _, endpoint := range podMonitor.Spec.PodMetricsEndpoints {
|
|
for _, relabeling := range endpoint.Relabelings {
|
|
if relabeling.Action != "keep" {
|
|
continue
|
|
}
|
|
fmt.Fprintf(os.Stderr, "WARNING: podmonitor %s/%s contains relabeling which are currently not processed\n",
|
|
podMonitor.Namespace, podMonitor.Name)
|
|
}
|
|
}
|
|
for _, namespace := range namespaces {
|
|
pods := slices.DeleteFunc(slices.Clone(c.pods[namespace]), func(pod v1.Pod) bool {
|
|
return !selector.Matches(labels.Set(pod.Labels))
|
|
})
|
|
for _, pod := range pods {
|
|
podkey := pod.Namespace + "/" + pod.Name
|
|
klog.V(3).Infof(" match %s/%s\n", pod.Namespace, pod.Name)
|
|
for _, container := range pod.Spec.Containers {
|
|
for _, endpoint := range podMonitor.Spec.PodMetricsEndpoints {
|
|
ep, err := c.getEndpointSpec(endpoint, &pod, &container, container.Ports)
|
|
if err != nil {
|
|
// TODO generate warning?
|
|
// this container did not match
|
|
continue
|
|
}
|
|
c.endpoints[podkey] = append(c.endpoints[podkey], ep...)
|
|
}
|
|
}
|
|
if len(c.endpoints[podkey]) > 0 {
|
|
klog.V(3).Infof(" POD %s/%s :%v\n", pod.Namespace, pod.Name,
|
|
c.endpoints[podkey])
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) getNamespacesForMonitor(namespaceSelector NamespaceSelector, defaultNamespace string) []string {
|
|
namespaces := MapKeys(c.namespaces)
|
|
if !namespaceSelector.Any {
|
|
namespaces = namespaceSelector.MatchNames
|
|
if len(namespaces) == 0 {
|
|
namespaces = []string{defaultNamespace}
|
|
}
|
|
}
|
|
return namespaces
|
|
}
|
|
|
|
func (c *Cluster) processServiceMonitor(serviceMonitor ServiceMonitor) error {
|
|
selector, err := metav1.LabelSelectorAsSelector(&serviceMonitor.Spec.Selector)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
namespaces := c.getNamespacesForMonitor(serviceMonitor.Spec.NamespaceSelector,
|
|
serviceMonitor.Namespace)
|
|
|
|
for _, endpoint := range serviceMonitor.Spec.Endpoints {
|
|
for _, relabeling := range endpoint.Relabelings {
|
|
if relabeling.Action != "keep" {
|
|
continue
|
|
}
|
|
fmt.Fprintf(os.Stderr, "WARNING: servicemonitor %s/%s contains relabeling which are currently not processed\n",
|
|
serviceMonitor.Namespace, serviceMonitor.Name)
|
|
}
|
|
}
|
|
for _, namespace := range namespaces {
|
|
svcs := slices.DeleteFunc(slices.Clone(c.services[namespace]), func(svc v1.Service) bool {
|
|
return !selector.Matches(labels.Set(svc.Labels))
|
|
})
|
|
for _, svc := range svcs {
|
|
klog.V(3).Infof(" match %s/%s\n", svc.Namespace, svc.Name)
|
|
if svc.Spec.Selector == nil {
|
|
fmt.Fprintf(os.Stderr, " service does not have a selector, skippingit, probably configured using endpoints\n")
|
|
continue
|
|
}
|
|
// Now we need to obtain the pod port.
|
|
// 1. Port specified
|
|
// service targetPort is used (equal to port if not specified)
|
|
// a. lookup pods matched by the service using the service selector
|
|
// b. path is /metics or the metric defined in the service selector
|
|
// c. port, path is defined and poss are known.
|
|
// 2. Target port specified
|
|
// a, b, c see above.
|
|
svcSelector := labels.SelectorFromSet(labels.Set(svc.Spec.Selector))
|
|
// find pods
|
|
pods := slices.DeleteFunc(slices.Clone(c.pods[svc.Namespace]), func(pod v1.Pod) bool {
|
|
return !svcSelector.Matches(labels.Set(pod.Labels))
|
|
})
|
|
for _, endpoint := range serviceMonitor.Spec.Endpoints {
|
|
for _, pod := range pods {
|
|
podkey := pod.Namespace + "/" + pod.Name
|
|
for _, container := range pod.Spec.Containers {
|
|
ep, err := c.getEndpointSpec(endpoint, &pod, &container, container.Ports)
|
|
if err != nil {
|
|
// TODO: generate warning?
|
|
// this wervice did not match
|
|
}
|
|
|
|
c.endpoints[podkey] = append(c.endpoints[podkey], ep...)
|
|
}
|
|
if len(c.endpoints[podkey]) > 0 {
|
|
klog.V(3).Infof(" POD %s/%s :%v\n", pod.Namespace, pod.Name,
|
|
c.endpoints[podkey])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type GeneralizedPort struct {
|
|
Name string
|
|
TargetPort string
|
|
Protocol string
|
|
}
|
|
|
|
func MapServicePorts(ports []v1.ServicePort) []GeneralizedPort {
|
|
return Map(ports, func(p v1.ServicePort) GeneralizedPort {
|
|
return GeneralizedPort{
|
|
Name: p.Name,
|
|
TargetPort: p.TargetPort.String(),
|
|
Protocol: string(p.Protocol),
|
|
}
|
|
})
|
|
}
|
|
|
|
func MapConteinerPorts(ports []v1.ContainerPort) []GeneralizedPort {
|
|
return Map(ports, func(p v1.ContainerPort) GeneralizedPort {
|
|
return GeneralizedPort{
|
|
Name: p.Name,
|
|
TargetPort: strconv.Itoa(int(p.ContainerPort)),
|
|
Protocol: string(p.Protocol),
|
|
}
|
|
})
|
|
}
|
|
|
|
func (c *Cluster) getEndpointSpec(endpoint Endpoint, pod *v1.Pod, container *v1.Container, ports []v1.ContainerPort) ([]MonitoringEndpoint, error) {
|
|
klog.V(4).Infof(" analyzing %s/%s container %s/%s %v\n",
|
|
endpoint.Port, endpoint.TargetPort, pod.Name, container.Name, ports)
|
|
res := []MonitoringEndpoint{}
|
|
portLoop:
|
|
for _, port := range ports {
|
|
targetPort := ""
|
|
switch {
|
|
case endpoint.Port != "":
|
|
//fmt.Fprintf(os.Stderr, "Checking %v %s\n", port.Port, endpoint.Port)
|
|
if port.Name != endpoint.Port {
|
|
continue
|
|
}
|
|
case endpoint.TargetPort != "":
|
|
if strconv.Itoa(int(port.ContainerPort)) != endpoint.TargetPort {
|
|
continue
|
|
}
|
|
}
|
|
if targetPort == "" {
|
|
if port.Name != "" {
|
|
targetPort = port.Name
|
|
} else {
|
|
targetPort = strconv.Itoa(int(port.ContainerPort))
|
|
}
|
|
}
|
|
// check relabeling configs
|
|
for _, relabeling := range endpoint.Relabelings {
|
|
match, err := MatchRelabeling(relabeling, pod, container, &port)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !match {
|
|
continue portLoop
|
|
}
|
|
}
|
|
// port matches all criteria
|
|
path := "/metrics"
|
|
if endpoint.Path != "" {
|
|
path = endpoint.Path
|
|
}
|
|
ep := MonitoringEndpoint{
|
|
Port: targetPort,
|
|
Path: path,
|
|
}
|
|
res = append(res, ep)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (c *Cluster) processServiceMonitors() error {
|
|
fmt.Fprintf(os.Stderr, "Getting service monitors\n")
|
|
serviceMonitorGVR := schema.GroupVersionResource{
|
|
Group: "monitoring.coreos.com",
|
|
Version: "v1",
|
|
Resource: "servicemonitors",
|
|
}
|
|
list, err := c.dynClient.Resource(serviceMonitorGVR).
|
|
Namespace("").
|
|
List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, item := range list.Items {
|
|
var sm ServiceMonitor
|
|
bytes, _ := json.Marshal(item.Object)
|
|
if err := json.Unmarshal(bytes, &sm); err != nil {
|
|
return fmt.Errorf("Error unmarshalling %s: %w", bytes, err)
|
|
}
|
|
klog.V(2).Infof("Found servicemonitor: %s/%s\n", sm.ObjectMeta.Namespace, sm.ObjectMeta.Name)
|
|
if err := c.processServiceMonitor(sm); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) procewssPodMonitors() error {
|
|
fmt.Fprintf(os.Stderr, "Getting pod monitors\n")
|
|
podMonitorGVR := schema.GroupVersionResource{
|
|
Group: "monitoring.coreos.com",
|
|
Version: "v1",
|
|
Resource: "podmonitors",
|
|
}
|
|
list, err := c.dynClient.Resource(podMonitorGVR).
|
|
Namespace("").
|
|
List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, item := range list.Items {
|
|
var pm PodMonitor
|
|
bytes, _ := json.Marshal(item.Object)
|
|
if err := json.Unmarshal(bytes, &pm); err != nil {
|
|
return fmt.Errorf("Error unmarshalling %s: %w", bytes, err)
|
|
}
|
|
klog.V(2).Infof("Found podmonitor: %s/%s\n", pm.ObjectMeta.Namespace, pm.ObjectMeta.Name)
|
|
if err := c.processPodMonitor(pm); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) Pods(application *Application) []v1.Pod {
|
|
selector, err := metav1.LabelSelectorAsSelector(application.Selector())
|
|
if err != nil {
|
|
log.Fatalf("Error creating selector: %v", err)
|
|
}
|
|
pods := c.pods[application.Namespace.Name]
|
|
pods = slices.DeleteFunc(slices.Clone(pods), func(pod v1.Pod) bool {
|
|
return !selector.Matches(labels.Set(pod.Labels))
|
|
})
|
|
return pods
|
|
}
|
|
|
|
func (c *Cluster) ServiceAccounts(application *Application) []string {
|
|
var res []string
|
|
for _, pod := range c.Pods(application) {
|
|
if !slices.Contains(res, pod.Spec.ServiceAccountName) {
|
|
res = append(res, pod.Spec.ServiceAccountName)
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (c *Cluster) OwnerReferences(application *Application) []string {
|
|
var ownerReferences []string
|
|
for _, pod := range c.Pods(application) {
|
|
//log.Printf(" %s %v", pod.Name, pod.OwnerReferences)
|
|
for _, ownerReference := range pod.OwnerReferences {
|
|
owner := ownerReference.Kind + "/" + ownerReference.Name
|
|
if !slices.Contains(ownerReferences, owner) {
|
|
ownerReferences = append(ownerReferences, owner)
|
|
}
|
|
}
|
|
}
|
|
return ownerReferences
|
|
}
|
|
|
|
func (c *Cluster) IsLinkerdEnabled(application *Application) bool {
|
|
pods := c.Pods(application)
|
|
|
|
ndisabled := 0
|
|
for _, pod := range pods {
|
|
if pod.Annotations["linkerd.io/inject"] == "enabled" {
|
|
return true
|
|
}
|
|
if pod.Annotations["linkerd.io/inject"] == "disabled" {
|
|
ndisabled++
|
|
}
|
|
}
|
|
if ndisabled == len(pods) {
|
|
return false
|
|
}
|
|
ns := c.namespaces[application.Namespace.Name]
|
|
return ns.Annotations["linkerd.io/inject"] == "enabled"
|
|
}
|
|
|
|
func (c *Cluster) NamespaceLIst() []v1.Namespace {
|
|
return MapValues(c.namespaces)
|
|
}
|
|
|
|
func (c *Cluster) Namespace(name string) v1.Namespace {
|
|
return c.namespaces[name]
|
|
}
|
|
|
|
func (c *Cluster) PodList(namespace string) []v1.Pod {
|
|
return c.pods[namespace]
|
|
}
|
|
|
|
func (c *Cluster) PortNumbers(application *Application) []Port {
|
|
if !c.IsLinkerdEnabled(application) {
|
|
return nil
|
|
}
|
|
tcpPorts := make(map[int]Port)
|
|
udpPorts := make(map[int]Port)
|
|
for _, pod := range c.Pods(application) {
|
|
for _, container := range pod.Spec.Containers {
|
|
if container.Name == "linkerd-proxy" {
|
|
continue
|
|
}
|
|
for _, port := range container.Ports {
|
|
switch port.Protocol {
|
|
case "TCP":
|
|
tcpPorts[int(port.ContainerPort)] = Port{
|
|
Port: strconv.Itoa(int(port.ContainerPort)),
|
|
Protocol: string(port.Protocol),
|
|
}
|
|
case "UDP":
|
|
udpPorts[int(port.ContainerPort)] = Port{
|
|
Port: strconv.Itoa(int(port.ContainerPort)),
|
|
Protocol: string(port.Protocol),
|
|
}
|
|
|
|
default:
|
|
panic(fmt.Sprintf("Unknown port type for pod %s/%s: %s",
|
|
pod.Namespace, pod.Name, port.Protocol))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
res := MapValues(tcpPorts)
|
|
res = append(res, MapValues(udpPorts)...)
|
|
return res
|
|
}
|