now processing relabeling rules for prometheus
obtaining all required monitoring endpoints now seems to work.
This commit is contained in:
		
							parent
							
								
									2bea96cc57
								
							
						
					
					
						commit
						2373f428bb
					
				| @ -6,40 +6,409 @@ import ( | |||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/labels" | 	"k8s.io/apimachinery/pkg/labels" | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/json" | ||||||
|  | 	"k8s.io/client-go/dynamic" | ||||||
| 	"k8s.io/client-go/kubernetes" | 	"k8s.io/client-go/kubernetes" | ||||||
|  | 	"k8s.io/klog/v2" | ||||||
| 	"log" | 	"log" | ||||||
|  | 	"os" | ||||||
| 	"slices" | 	"slices" | ||||||
| 	"strconv" | 	"strconv" | ||||||
|  | 	"strings" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | type ServiceMonitor struct { | ||||||
|  | 	metav1.TypeMeta   `json:",inline"` | ||||||
|  | 	metav1.ObjectMeta `json:"metadata,omitempty"` | ||||||
|  | 	Spec              ServiceMonitorSpec `json:"spec"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type ServiceMonitorSpec struct { | ||||||
|  | 	NamespaceSelector NamespaceSelector    `json:"namespaceSelector"` | ||||||
|  | 	Selector          metav1.LabelSelector `json:"selector"` | ||||||
|  | 	Endpoints         []Endpoint           `json:"endpoints"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type Endpoint struct { | ||||||
|  | 	Path        string          `json:"path,omitempty"` | ||||||
|  | 	Port        string          `json:"port"` | ||||||
|  | 	TargetPort  string          `json:"targetPort"` | ||||||
|  | 	Scheme      string          `json:"scheme,omitempty"` | ||||||
|  | 	Relabelings []RelabelConfig `json:"relabelings,omitempty"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type PodMonitor struct { | ||||||
|  | 	metav1.TypeMeta   `json:",inline"` | ||||||
|  | 	metav1.ObjectMeta `json:"metadata,omitempty"` | ||||||
|  | 	Spec              PodMonitorSpec `json:"spec"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type PodMonitorSpec struct { | ||||||
|  | 	NamespaceSelector   NamespaceSelector    `json:"namespaceSelector"` | ||||||
|  | 	Selector            metav1.LabelSelector `json:"selector"` | ||||||
|  | 	PodMetricsEndpoints []Endpoint           `json:"podMetricsEndpoints"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type NamespaceSelector struct { | ||||||
|  | 	Any        bool     `json:"any,omitempty"` | ||||||
|  | 	MatchNames []string `json:"matchNames,omitempty"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type RelabelConfig struct { | ||||||
|  | 	Action       string   `json:"action,omitempty"` | ||||||
|  | 	SourceLabels []string `json:"sourceLabels,omitempty"` | ||||||
|  | 	Separator    string   `json:"separator,omitempty"` | ||||||
|  | 	TargetLabel  string   `json:"targetLabel,omitempty"` | ||||||
|  | 	Regex        string   `json:"regex,omitempty"` | ||||||
|  | 	Modulus      uint64   `json:"modulus,omitempty"` | ||||||
|  | 	Replacement  string   `json:"replacement,omitempty"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type Cluster struct { | type Cluster struct { | ||||||
| 	namespaces map[string]v1.Namespace | 	namespaces map[string]v1.Namespace | ||||||
| 	clientset  *kubernetes.Clientset | 	clientset  *kubernetes.Clientset | ||||||
|  | 	dynClient  *dynamic.DynamicClient | ||||||
| 	// map of namespace to list of all pods
 | 	// map of namespace to list of all pods
 | ||||||
| 	pods     map[string][]v1.Pod | 	pods     map[string][]v1.Pod | ||||||
|  | 	services map[string][]v1.Service | ||||||
|  | 
 | ||||||
|  | 	// map of   namespace/podname ->: ScrapingEndpoints
 | ||||||
|  | 	endpoints map[string]MonitoringEndpoints | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewCluster(clientset *kubernetes.Clientset) (*Cluster, error) { | type MonitoringEndpoint struct { | ||||||
|  | 	Port string | ||||||
|  | 	Path string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type MonitoringEndpoints []MonitoringEndpoint | ||||||
|  | 
 | ||||||
|  | func (s MonitoringEndpoint) CompareTo(e MonitoringEndpoint) int { | ||||||
|  | 	if res := strings.Compare(s.Port, e.Port); res != 0 { | ||||||
|  | 		return res | ||||||
|  | 	} | ||||||
|  | 	return strings.Compare(s.Path, e.Path) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *MonitoringEndpoints) Merge(endpoints MonitoringEndpoints) { | ||||||
|  | 	*s = append(*s, endpoints...) | ||||||
|  | 	slices.SortFunc(*s, func(a, b MonitoringEndpoint) int { | ||||||
|  | 		return a.CompareTo(b) | ||||||
|  | 	}) | ||||||
|  | 	*s = slices.CompactFunc(*s, func(a, b MonitoringEndpoint) bool { | ||||||
|  | 		return a.CompareTo(b) == 0 | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewCluster(clientset *kubernetes.Clientset, dynClient *dynamic.DynamicClient) (*Cluster, error) { | ||||||
| 	cluster := &Cluster{ | 	cluster := &Cluster{ | ||||||
| 		clientset: clientset, | 		clientset: clientset, | ||||||
|  | 		dynClient: dynClient, | ||||||
|  | 
 | ||||||
| 		namespaces: make(map[string]v1.Namespace), | 		namespaces: make(map[string]v1.Namespace), | ||||||
| 		pods:       make(map[string][]v1.Pod), | 		pods:       make(map[string][]v1.Pod), | ||||||
|  | 		services:   make(map[string][]v1.Service), | ||||||
|  | 		endpoints:  make(map[string]MonitoringEndpoints), | ||||||
| 	} | 	} | ||||||
| 	nslist, err := cluster.clientset.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) | 	nslist, err := cluster.clientset.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	fmt.Fprintf(os.Stderr, "Getting pods, services\n") | ||||||
| 	for _, ns := range nslist.Items { | 	for _, ns := range nslist.Items { | ||||||
| 		cluster.namespaces[ns.Name] = ns | 		cluster.namespaces[ns.Name] = ns | ||||||
| 		podList, err := cluster.clientset.CoreV1().Pods(ns.Name).List(context.Background(), metav1.ListOptions{}) | 	} | ||||||
|  | 	podList, err := cluster.clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 		cluster.pods[ns.Name] = podList.Items | 	for _, pod := range podList.Items { | ||||||
|  | 		cluster.pods[pod.Namespace] = append(cluster.pods[pod.Namespace], pod) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	svcList, err := cluster.clientset.CoreV1().Services("").List(context.Background(), metav1.ListOptions{}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	for _, svc := range svcList.Items { | ||||||
|  | 		cluster.services[svc.Namespace] = append(cluster.services[svc.Namespace], svc) | ||||||
|  | 	} | ||||||
|  | 	err = cluster.processServiceMonitors() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	err = cluster.procewssPodMonitors() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	podkeys := MapKeys(cluster.endpoints) | ||||||
|  | 	slices.Sort(podkeys) | ||||||
|  | 	for _, pod := range podkeys { | ||||||
|  | 		monitors := cluster.endpoints[pod] | ||||||
|  | 		slices.SortFunc(monitors, | ||||||
|  | 			func(m1, m2 MonitoringEndpoint) int { | ||||||
|  | 				return m1.CompareTo(m2) | ||||||
|  | 			}) | ||||||
|  | 		monitors = slices.CompactFunc(monitors, | ||||||
|  | 			func(m1, m2 MonitoringEndpoint) bool { | ||||||
|  | 				return m1.CompareTo(m2) == 0 | ||||||
|  | 			}) | ||||||
|  | 		cluster.endpoints[pod] = monitors | ||||||
|  | 		fmt.Fprintf(os.Stderr, "monitoring endpoint: %s: %v\n", pod, monitors) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return cluster, nil | 	return cluster, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *Cluster) processPodMonitor(podMonitor PodMonitor) error { | ||||||
|  | 	selector, err := metav1.LabelSelectorAsSelector(&podMonitor.Spec.Selector) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	namespaces := c.getNamespacesForMonitor(podMonitor.Spec.NamespaceSelector, podMonitor.Namespace) | ||||||
|  | 	for _, endpoint := range podMonitor.Spec.PodMetricsEndpoints { | ||||||
|  | 		for _, relabeling := range endpoint.Relabelings { | ||||||
|  | 			if relabeling.Action != "keep" { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			fmt.Fprintf(os.Stderr, "WARNING: podmonitor %s/%s contains relabeling which are currently not processed\n", | ||||||
|  | 				podMonitor.Namespace, podMonitor.Name) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	for _, namespace := range namespaces { | ||||||
|  | 		pods := slices.DeleteFunc(slices.Clone(c.pods[namespace]), func(pod v1.Pod) bool { | ||||||
|  | 			return !selector.Matches(labels.Set(pod.Labels)) | ||||||
|  | 		}) | ||||||
|  | 		for _, pod := range pods { | ||||||
|  | 			podkey := pod.Namespace + "/" + pod.Name | ||||||
|  | 			klog.V(3).Infof("  match %s/%s\n", pod.Namespace, pod.Name) | ||||||
|  | 			for _, container := range pod.Spec.Containers { | ||||||
|  | 				for _, endpoint := range podMonitor.Spec.PodMetricsEndpoints { | ||||||
|  | 					ep, err := c.getEndpointSpec(endpoint, &pod, &container, container.Ports) | ||||||
|  | 					if err != nil { | ||||||
|  | 						// TODO generate warning?
 | ||||||
|  | 						// this container did not match
 | ||||||
|  | 						continue | ||||||
|  | 					} | ||||||
|  | 					c.endpoints[podkey] = append(c.endpoints[podkey], ep...) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			if len(c.endpoints[podkey]) > 0 { | ||||||
|  | 				klog.V(3).Infof("    POD %s/%s :%v\n", pod.Namespace, pod.Name, | ||||||
|  | 					c.endpoints[podkey]) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) getNamespacesForMonitor(namespaceSelector NamespaceSelector, defaultNamespace string) []string { | ||||||
|  | 	namespaces := MapKeys(c.namespaces) | ||||||
|  | 	if !namespaceSelector.Any { | ||||||
|  | 		namespaces = namespaceSelector.MatchNames | ||||||
|  | 		if len(namespaces) == 0 { | ||||||
|  | 			namespaces = []string{defaultNamespace} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return namespaces | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) processServiceMonitor(serviceMonitor ServiceMonitor) error { | ||||||
|  | 	selector, err := metav1.LabelSelectorAsSelector(&serviceMonitor.Spec.Selector) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	namespaces := c.getNamespacesForMonitor(serviceMonitor.Spec.NamespaceSelector, | ||||||
|  | 		serviceMonitor.Namespace) | ||||||
|  | 
 | ||||||
|  | 	for _, endpoint := range serviceMonitor.Spec.Endpoints { | ||||||
|  | 		for _, relabeling := range endpoint.Relabelings { | ||||||
|  | 			if relabeling.Action != "keep" { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			fmt.Fprintf(os.Stderr, "WARNING: servicemonitor %s/%s contains relabeling which are currently not processed\n", | ||||||
|  | 				serviceMonitor.Namespace, serviceMonitor.Name) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	for _, namespace := range namespaces { | ||||||
|  | 		svcs := slices.DeleteFunc(slices.Clone(c.services[namespace]), func(svc v1.Service) bool { | ||||||
|  | 			return !selector.Matches(labels.Set(svc.Labels)) | ||||||
|  | 		}) | ||||||
|  | 		for _, svc := range svcs { | ||||||
|  | 			klog.V(3).Infof("  match %s/%s\n", svc.Namespace, svc.Name) | ||||||
|  | 			if svc.Spec.Selector == nil { | ||||||
|  | 				fmt.Fprintf(os.Stderr, "  service does not have a selector, skippingit, probably configured using endpoints\n") | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			// Now we need to obtain the pod port.
 | ||||||
|  | 			// 1. Port specified
 | ||||||
|  | 			//      service targetPort is used (equal to port if not specified)
 | ||||||
|  | 			//   a. lookup pods matched by the service using the service selector
 | ||||||
|  | 			//   b. path is /metics or the metric defined in the service selector
 | ||||||
|  | 			//   c. port, path is defined and poss are known.
 | ||||||
|  | 			// 2. Target port specified
 | ||||||
|  | 			//   a, b, c see above.
 | ||||||
|  | 			svcSelector := labels.SelectorFromSet(labels.Set(svc.Spec.Selector)) | ||||||
|  | 			// find pods
 | ||||||
|  | 			pods := slices.DeleteFunc(slices.Clone(c.pods[svc.Namespace]), func(pod v1.Pod) bool { | ||||||
|  | 				return !svcSelector.Matches(labels.Set(pod.Labels)) | ||||||
|  | 			}) | ||||||
|  | 			for _, endpoint := range serviceMonitor.Spec.Endpoints { | ||||||
|  | 				for _, pod := range pods { | ||||||
|  | 					podkey := pod.Namespace + "/" + pod.Name | ||||||
|  | 					for _, container := range pod.Spec.Containers { | ||||||
|  | 						ep, err := c.getEndpointSpec(endpoint, &pod, &container, container.Ports) | ||||||
|  | 						if err != nil { | ||||||
|  | 							// TODO: generate warning?
 | ||||||
|  | 							// this wervice did not match
 | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						c.endpoints[podkey] = append(c.endpoints[podkey], ep...) | ||||||
|  | 					} | ||||||
|  | 					if len(c.endpoints[podkey]) > 0 { | ||||||
|  | 						klog.V(3).Infof("    POD %s/%s :%v\n", pod.Namespace, pod.Name, | ||||||
|  | 							c.endpoints[podkey]) | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type GeneralizedPort struct { | ||||||
|  | 	Name       string | ||||||
|  | 	TargetPort string | ||||||
|  | 	Protocol   string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func MapServicePorts(ports []v1.ServicePort) []GeneralizedPort { | ||||||
|  | 	return Map(ports, func(p v1.ServicePort) GeneralizedPort { | ||||||
|  | 		return GeneralizedPort{ | ||||||
|  | 			Name:       p.Name, | ||||||
|  | 			TargetPort: p.TargetPort.String(), | ||||||
|  | 			Protocol:   string(p.Protocol), | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func MapConteinerPorts(ports []v1.ContainerPort) []GeneralizedPort { | ||||||
|  | 	return Map(ports, func(p v1.ContainerPort) GeneralizedPort { | ||||||
|  | 		return GeneralizedPort{ | ||||||
|  | 			Name:       p.Name, | ||||||
|  | 			TargetPort: strconv.Itoa(int(p.ContainerPort)), | ||||||
|  | 			Protocol:   string(p.Protocol), | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) getEndpointSpec(endpoint Endpoint, pod *v1.Pod, container *v1.Container, ports []v1.ContainerPort) ([]MonitoringEndpoint, error) { | ||||||
|  | 	klog.V(4).Infof("    analyzing %s/%s container %s/%s %v\n", | ||||||
|  | 		endpoint.Port, endpoint.TargetPort, pod.Name, container.Name, ports) | ||||||
|  | 	res := []MonitoringEndpoint{} | ||||||
|  | portLoop: | ||||||
|  | 	for _, port := range ports { | ||||||
|  | 		targetPort := "" | ||||||
|  | 		switch { | ||||||
|  | 		case endpoint.Port != "": | ||||||
|  | 			//fmt.Fprintf(os.Stderr, "Checking %v %s\n", port.Port, endpoint.Port)
 | ||||||
|  | 			if port.Name != endpoint.Port { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		case endpoint.TargetPort != "": | ||||||
|  | 			if strconv.Itoa(int(port.ContainerPort)) != endpoint.TargetPort { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if targetPort == "" { | ||||||
|  | 			if port.Name != "" { | ||||||
|  | 				targetPort = port.Name | ||||||
|  | 			} else { | ||||||
|  | 				targetPort = strconv.Itoa(int(port.ContainerPort)) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		// check relabeling configs
 | ||||||
|  | 		for _, relabeling := range endpoint.Relabelings { | ||||||
|  | 			match, err := MatchRelabeling(relabeling, pod, container, &port) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  | 			if !match { | ||||||
|  | 				continue portLoop | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		// port matches all criteria
 | ||||||
|  | 		path := "/metrics" | ||||||
|  | 		if endpoint.Path != "" { | ||||||
|  | 			path = endpoint.Path | ||||||
|  | 		} | ||||||
|  | 		ep := MonitoringEndpoint{ | ||||||
|  | 			Port: targetPort, | ||||||
|  | 			Path: path, | ||||||
|  | 		} | ||||||
|  | 		res = append(res, ep) | ||||||
|  | 	} | ||||||
|  | 	return res, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) processServiceMonitors() error { | ||||||
|  | 	fmt.Fprintf(os.Stderr, "Getting service monitors\n") | ||||||
|  | 	serviceMonitorGVR := schema.GroupVersionResource{ | ||||||
|  | 		Group:    "monitoring.coreos.com", | ||||||
|  | 		Version:  "v1", | ||||||
|  | 		Resource: "servicemonitors", | ||||||
|  | 	} | ||||||
|  | 	list, err := c.dynClient.Resource(serviceMonitorGVR). | ||||||
|  | 		Namespace(""). | ||||||
|  | 		List(context.Background(), metav1.ListOptions{}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	for _, item := range list.Items { | ||||||
|  | 		var sm ServiceMonitor | ||||||
|  | 		bytes, _ := json.Marshal(item.Object) | ||||||
|  | 		if err := json.Unmarshal(bytes, &sm); err != nil { | ||||||
|  | 			return fmt.Errorf("Error unmarshalling %s: %w", bytes, err) | ||||||
|  | 		} | ||||||
|  | 		klog.V(2).Infof("Found servicemonitor: %s/%s\n", sm.ObjectMeta.Namespace, sm.ObjectMeta.Name) | ||||||
|  | 		if err := c.processServiceMonitor(sm); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) procewssPodMonitors() error { | ||||||
|  | 	fmt.Fprintf(os.Stderr, "Getting pod monitors\n") | ||||||
|  | 	podMonitorGVR := schema.GroupVersionResource{ | ||||||
|  | 		Group:    "monitoring.coreos.com", | ||||||
|  | 		Version:  "v1", | ||||||
|  | 		Resource: "podmonitors", | ||||||
|  | 	} | ||||||
|  | 	list, err := c.dynClient.Resource(podMonitorGVR). | ||||||
|  | 		Namespace(""). | ||||||
|  | 		List(context.Background(), metav1.ListOptions{}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	for _, item := range list.Items { | ||||||
|  | 		var pm PodMonitor | ||||||
|  | 		bytes, _ := json.Marshal(item.Object) | ||||||
|  | 		if err := json.Unmarshal(bytes, &pm); err != nil { | ||||||
|  | 			return fmt.Errorf("Error unmarshalling %s: %w", bytes, err) | ||||||
|  | 		} | ||||||
|  | 		klog.V(2).Infof("Found podmonitor: %s/%s\n", pm.ObjectMeta.Namespace, pm.ObjectMeta.Name) | ||||||
|  | 		if err := c.processPodMonitor(pm); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *Cluster) Pods(application *Application) []v1.Pod { | func (c *Cluster) Pods(application *Application) []v1.Pod { | ||||||
| 	selector, err := metav1.LabelSelectorAsSelector(application.Selector()) | 	selector, err := metav1.LabelSelectorAsSelector(application.Selector()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | |||||||
| @ -35,13 +35,13 @@ func IterToSlice[K any](i iter.Seq[K]) []K { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func validate(files []string, options *Options) error { | func validate(files []string, options *Options) error { | ||||||
| 	clientset, _ := GetKubernetesConnection() | 	clientset, dynClient, _ := GetKubernetesConnection() | ||||||
| 	config, err := readConfig(files) | 	config, err := readConfig(files) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	cluster, err := NewCluster(clientset) | 	cluster, err := NewCluster(clientset, dynClient) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -1,12 +1,13 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"k8s.io/client-go/dynamic" | ||||||
| 	"k8s.io/client-go/kubernetes" | 	"k8s.io/client-go/kubernetes" | ||||||
| 	"k8s.io/client-go/tools/clientcmd" | 	"k8s.io/client-go/tools/clientcmd" | ||||||
| 	"log" | 	"log" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func GetKubernetesConnection() (*kubernetes.Clientset, string) { | func GetKubernetesConnection() (*kubernetes.Clientset, *dynamic.DynamicClient, string) { | ||||||
| 	loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() | 	loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() | ||||||
| 	configOverrides := &clientcmd.ConfigOverrides{} | 	configOverrides := &clientcmd.ConfigOverrides{} | ||||||
| 	kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) | 	kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) | ||||||
| @ -20,12 +21,16 @@ func GetKubernetesConnection() (*kubernetes.Clientset, string) { | |||||||
| 
 | 
 | ||||||
| 	clientset, err := kubernetes.NewForConfig(config) | 	clientset, err := kubernetes.NewForConfig(config) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Panicln(err.Error()) | 		panic(err) | ||||||
|  | 	} | ||||||
|  | 	dynamicClient, err := dynamic.NewForConfig(config) | ||||||
|  | 	if err != nil { | ||||||
|  | 		panic(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	namespace, _, err := kubeConfig.Namespace() | 	namespace, _, err := kubeConfig.Namespace() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Panicf("Could not get namespace") | 		log.Panicf("Could not get namespace") | ||||||
| 	} | 	} | ||||||
| 	return clientset, namespace | 	return clientset, dynamicClient, namespace | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,8 +1,10 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	goflags "flag" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"github.com/spf13/cobra" | 	"github.com/spf13/cobra" | ||||||
|  | 	"k8s.io/klog/v2" | ||||||
| 	"os" | 	"os" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -59,11 +61,12 @@ func generateLinkerdPolicies(files []string, options *Options) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	clientset, _ := GetKubernetesConnection() | 	clientset, dynClient, _ := GetKubernetesConnection() | ||||||
| 	cluster, err := NewCluster(clientset) | 	cluster, err := NewCluster(clientset, dynClient) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	fmt.Fprintf(os.Stderr, "Enhancing configuration based on running cluster\n") | ||||||
| 	config.Infer(cluster) | 	config.Infer(cluster) | ||||||
| 
 | 
 | ||||||
| 	policyTemplates, err := NewPolicyTemplates() | 	policyTemplates, err := NewPolicyTemplates() | ||||||
| @ -81,6 +84,8 @@ func generateLinkerdPolicies(files []string, options *Options) error { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func main() { | func main() { | ||||||
|  | 	klogFlags := goflags.NewFlagSet("", goflags.PanicOnError) | ||||||
|  | 	klog.InitFlags(klogFlags) | ||||||
| 
 | 
 | ||||||
| 	options := Options{ | 	options := Options{ | ||||||
| 		cni:        "cilium", | 		cni:        "cilium", | ||||||
|  | |||||||
							
								
								
									
										150
									
								
								cmd/policygen/prometheus.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										150
									
								
								cmd/policygen/prometheus.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,150 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	v1 "k8s.io/api/core/v1" | ||||||
|  | 	"k8s.io/klog/v2" | ||||||
|  | 	"regexp" | ||||||
|  | 	"slices" | ||||||
|  | 	"strconv" | ||||||
|  | 	"strings" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var standardPodLabels = map[string]func(pod *v1.Pod) string{ | ||||||
|  | 	"__meta_kubernetes_pod_name":            func(pod *v1.Pod) string { return pod.Name }, | ||||||
|  | 	"__meta_kubernetes_pod_ip":              func(pod *v1.Pod) string { return pod.Status.PodIP }, | ||||||
|  | 	"__meta_kubernetes_pod_node_name":       func(pod *v1.Pod) string { return pod.Spec.NodeName }, | ||||||
|  | 	"__meta_kubernetes_pod_uid":             func(pod *v1.Pod) string { return string(pod.UID) }, | ||||||
|  | 	"__meta_kubernetes_pod_phase":           func(pod *v1.Pod) string { return string(pod.Status.Phase) }, | ||||||
|  | 	"__meta_kubernetes_pod_ready":           func(pod *v1.Pod) string { return getPodReadyCondition(pod) }, | ||||||
|  | 	"__meta_kubernetes_pod_host_ip":         func(pod *v1.Pod) string { return pod.Status.HostIP }, | ||||||
|  | 	"__meta_kubernetes_namespace":           func(pod *v1.Pod) string { return pod.Namespace }, | ||||||
|  | 	"__meta_kubernetes_pod_container_init":  func(pod *v1.Pod) string { return "false" }, | ||||||
|  | 	"__meta_kubernetes_pod_service_account": func(pod *v1.Pod) string { return pod.Spec.ServiceAccountName }, | ||||||
|  | 	"__meta_kubernetes_pod_controller_kind": func(pod *v1.Pod) string { return getControllerKind(pod) }, | ||||||
|  | 	"__meta_kubernetes_pod_controller_name": func(pod *v1.Pod) string { return getControllerName(pod) }, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var containerLabels = map[string]func(pod *v1.Pod, container *v1.Container) string{ | ||||||
|  | 	"__meta_kubernetes_pod_container_name": func(pod *v1.Pod, container *v1.Container) string { | ||||||
|  | 		return container.Name | ||||||
|  | 	}, | ||||||
|  | 	"__meta_kubernetes_pod_container_id": func(pod *v1.Pod, container *v1.Container) string { | ||||||
|  | 		return getContainerID(pod, container) | ||||||
|  | 	}, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func metadataPodLabels(pod *v1.Pod) map[string]string { | ||||||
|  | 	labels := make(map[string]string) | ||||||
|  | 	for name, value := range pod.Labels { | ||||||
|  | 		key := "__meta_kubernetes_pod_label_" + strings.ReplaceAll(name, ".", "_") | ||||||
|  | 		key = strings.ReplaceAll(key, "/", "_") | ||||||
|  | 		key = strings.ReplaceAll(key, "-", "_") | ||||||
|  | 		labels[key] = value | ||||||
|  | 	} | ||||||
|  | 	for name, value := range pod.Annotations { | ||||||
|  | 		key := "__meta_kubernetes_pod_annotation_" + strings.ReplaceAll(name, ".", "_") | ||||||
|  | 		key = strings.ReplaceAll(key, "/", "_") | ||||||
|  | 		key = strings.ReplaceAll(key, "-", "_") | ||||||
|  | 		labels[key] = value | ||||||
|  | 	} | ||||||
|  | 	klog.V(6).Infof("METADATA LABELS %v\n", labels) | ||||||
|  | 	return labels | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func getContainerPortSourceLabels(port *v1.ContainerPort) map[string]string { | ||||||
|  | 	labels := make(map[string]string) | ||||||
|  | 	labels["__meta_kubernetes_pod_container_port_name"] = port.Name | ||||||
|  | 	labels["__meta_kubernetes_pod_container_port_number"] = strconv.FormatInt(int64(port.ContainerPort), 10) | ||||||
|  | 	labels["__meta_kubernetes_pod_container_port_protocol"] = string(port.Protocol) | ||||||
|  | 	return labels | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type RelabelingMatcher func(pod *v1.Pod) ([]v1.ContainerPort, error) | ||||||
|  | 
 | ||||||
|  | func MatchRelabeling(config RelabelConfig, pod *v1.Pod, container *v1.Container, | ||||||
|  | 	port *v1.ContainerPort) (bool, error) { | ||||||
|  | 	if config.Action != "keep" { | ||||||
|  | 		return true, nil | ||||||
|  | 	} | ||||||
|  | 	labelValues := make([]string, len(config.SourceLabels)) | ||||||
|  | 	for isource, sourceLabel := range config.SourceLabels { | ||||||
|  | 		if podLabelFunc, ok := standardPodLabels[sourceLabel]; ok { | ||||||
|  | 			labelValues[isource] = podLabelFunc(pod) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		metadataLabels := metadataPodLabels(pod) | ||||||
|  | 		if metadataLabelValue, ok := metadataLabels[sourceLabel]; ok { | ||||||
|  | 			labelValues[isource] = metadataLabelValue | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		if containerLabelFunc, ok := containerLabels[sourceLabel]; ok { | ||||||
|  | 			labelValues[isource] = containerLabelFunc(pod, container) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		containerPortSourceLabels := getContainerPortSourceLabels(port) | ||||||
|  | 		if containerPortValue, ok := containerPortSourceLabels[sourceLabel]; ok { | ||||||
|  | 			labelValues[isource] = containerPortValue | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	// concatenate the label values
 | ||||||
|  | 	labelstring := strings.Join(labelValues, ";") | ||||||
|  | 	matched, err := regexp.MatchString(config.Regex, labelstring) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} | ||||||
|  | 	klog.V(5).Infof("      relabeling '%s' ~ '%s': %v\n", labelstring, config.Regex, matched) | ||||||
|  | 	return matched, err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func combinations(s [][]string, i int, sequence []string) { | ||||||
|  | 	if i == len(s) { | ||||||
|  | 		fmt.Printf("COMBINATION %v\n", sequence) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	for _, v := range s[i] { | ||||||
|  | 		sequence[i] = v | ||||||
|  | 		combinations(s, i+1, sequence) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Helper functions
 | ||||||
|  | func getPodReadyCondition(pod *v1.Pod) string { | ||||||
|  | 	for _, condition := range pod.Status.Conditions { | ||||||
|  | 		if condition.Type == v1.PodReady { | ||||||
|  | 			if condition.Status == v1.ConditionTrue { | ||||||
|  | 				return "true" | ||||||
|  | 			} | ||||||
|  | 			return "false" | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return "unknown" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func getContainerID(pod *v1.Pod, container *v1.Container) string { | ||||||
|  | 	containerIndex := slices.IndexFunc(pod.Spec.Containers, func(c v1.Container) bool { | ||||||
|  | 		return c.Name == container.Name | ||||||
|  | 	}) | ||||||
|  | 	id := pod.Status.ContainerStatuses[containerIndex].ContainerID | ||||||
|  | 	// Strip the docker:// prefix if present
 | ||||||
|  | 	return strings.TrimPrefix(id, "docker://") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func getControllerKind(pod *v1.Pod) string { | ||||||
|  | 	for _, owner := range pod.OwnerReferences { | ||||||
|  | 		if owner.Controller != nil && *owner.Controller { | ||||||
|  | 			return owner.Kind | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func getControllerName(pod *v1.Pod) string { | ||||||
|  | 	for _, owner := range pod.OwnerReferences { | ||||||
|  | 		if owner.Controller != nil && *owner.Controller { | ||||||
|  | 			return owner.Name | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
							
								
								
									
										2
									
								
								cmd/policygen/templates/linkerd/namespace/monitored.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								cmd/policygen/templates/linkerd/namespace/monitored.yaml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,2 @@ | |||||||
|  | 
 | ||||||
|  | # a rule that allows ingress from monitoring | ||||||
							
								
								
									
										20
									
								
								cmd/policygen/templates/linkerd/namespace/namespace.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								cmd/policygen/templates/linkerd/namespace/namespace.yaml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,20 @@ | |||||||
|  | --- | ||||||
|  | # a Server matching all pods | ||||||
|  | # 1. must scan podmonitors and service monitoros | ||||||
|  | #    must scan for all applicable podmonitor and servicemonitor resources | ||||||
|  | #    Based on namespaceSelector: any (bool), matchNames ([]string) | ||||||
|  | #    spec.selector determines the pods. | ||||||
|  | # | ||||||
|  | # 2. determine pods targeted | ||||||
|  | # 2. for each targeted pod, determine the port number | ||||||
|  | # 3. for each targeted pod determine the application it belongs to (so we know the labels to use) | ||||||
|  | # 4. create a rule for the given port to the given application to allow access by monitoring. | ||||||
|  | # | ||||||
|  | # Build mapping of application -> pod | ||||||
|  | #                  pod -> podmonitors -> port(s) | ||||||
|  | #                  pod -> endpoint | ||||||
|  | # | ||||||
|  | # linkerd scraping port | ||||||
|  | #   linkerd-admin port on linkerd-proxy containers in any namespace, as long as they have the label linkerd.io/control-plane-ns=linkerd. | ||||||
|  | 
 | ||||||
|  | # a MeshTlsAuthentication matching all pods | ||||||
							
								
								
									
										3
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,6 +12,8 @@ require ( | |||||||
| 	k8s.io/api v0.32.0 | 	k8s.io/api v0.32.0 | ||||||
| 	k8s.io/apimachinery v0.32.0 | 	k8s.io/apimachinery v0.32.0 | ||||||
| 	k8s.io/client-go v0.32.0 | 	k8s.io/client-go v0.32.0 | ||||||
|  | 	k8s.io/klog/v2 v2.130.1 | ||||||
|  | 
 | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| require ( | require ( | ||||||
| @ -59,7 +61,6 @@ require ( | |||||||
| 	gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect | 	gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect | ||||||
| 	gopkg.in/inf.v0 v0.9.1 // indirect | 	gopkg.in/inf.v0 v0.9.1 // indirect | ||||||
| 	gopkg.in/yaml.v3 v3.0.1 // indirect | 	gopkg.in/yaml.v3 v3.0.1 // indirect | ||||||
| 	k8s.io/klog/v2 v2.130.1 // indirect |  | ||||||
| 	k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect | 	k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect | ||||||
| 	k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect | 	k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect | ||||||
| 	sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect | 	sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user