From 2e537818fb08ad17932dab8d839ca16fd79a4acb Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 2 Mar 2025 17:41:25 +0100 Subject: [PATCH] towards a structure with more dependency injection and uisng a moving window to see what pods were active in the given interval. --- cmd/fetcher/config.go | 12 ++++++++ cmd/fetcher/fetcher.go | 35 ++++++++++++----------- cmd/fetcher/kubernetes.go | 8 ++---- cmd/fetcher/main.go | 19 ++++++++----- cmd/fetcher/watcher.go | 59 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 103 insertions(+), 30 deletions(-) create mode 100644 cmd/fetcher/config.go create mode 100644 cmd/fetcher/watcher.go diff --git a/cmd/fetcher/config.go b/cmd/fetcher/config.go new file mode 100644 index 0000000..f4e4707 --- /dev/null +++ b/cmd/fetcher/config.go @@ -0,0 +1,12 @@ +package main + +import "time" + +type Config struct { + KubernetesNamespace string + SocketPath string + ContainerdNamespace string + Nodename string + ReadyDuration time.Duration + includeControllerNodes bool +} diff --git a/cmd/fetcher/fetcher.go b/cmd/fetcher/fetcher.go index 65f3cbc..b10462c 100644 --- a/cmd/fetcher/fetcher.go +++ b/cmd/fetcher/fetcher.go @@ -15,12 +15,15 @@ import ( ) type Fetcher struct { - KubernetesNamespace string - SocketPath string - ContainerdNamespace string - Nodename string - ReadyDuration time.Duration - includeControllerNodes bool + config *Config + clientset *kubernetes.Clientset +} + +func NewFetcher(config *Config, clientset *kubernetes.Clientset) *Fetcher { + return &Fetcher{ + config: config, + clientset: clientset, + } } func (fetcher *Fetcher) canonicalizeImageName(image string) string { @@ -76,7 +79,7 @@ func (fetcher *Fetcher) canonicalizeImageName(image string) string { func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]bool) bool { ready := false - if !fetcher.includeControllerNodes { + if !fetcher.config.includeControllerNodes { klog.Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName) if _, ok := controllers[pod.Spec.NodeName]; ok { return false @@ -84,7 +87,7 @@ func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]b } for _, condition := range pod.Status.Conditions { if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { - if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.ReadyDuration { + if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.config.ReadyDuration { ready = true } } @@ -99,7 +102,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin controllers := fetcher.getControllerNames(clientset) - pods, err := clientset.CoreV1().Pods(fetcher.KubernetesNamespace).List(context.Background(), + pods, err := clientset.CoreV1().Pods(fetcher.config.KubernetesNamespace).List(context.Background(), metav1.ListOptions{}) if err != nil { panic(err) @@ -112,7 +115,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) for _, container := range pod.Spec.InitContainers { klog.V(3).Infof(" %s\n", container.Image) - if pod.Spec.NodeName == fetcher.Nodename { + if pod.Spec.NodeName == fetcher.config.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { if fetcher.isReadyForSomeTime(&pod, controllers) { @@ -122,7 +125,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin } for _, container := range pod.Spec.Containers { klog.V(3).Infof(" %s\n", container.Image) - if pod.Spec.NodeName == fetcher.Nodename { + if pod.Spec.NodeName == fetcher.config.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { if fetcher.isReadyForSomeTime(&pod, controllers) { @@ -132,7 +135,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin } } - for container, _ := range containersOnCurrentNode { + for container := range containersOnCurrentNode { delete(containers, container) } return containers @@ -141,7 +144,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin func (fetcher *Fetcher) pullAndPin() error { // Create the image manager - containerd, err := NewContainerd(fetcher.SocketPath, fetcher.ContainerdNamespace) + containerd, err := NewContainerd(fetcher.config.SocketPath, fetcher.config.ContainerdNamespace) if err != nil { klog.Fatalf("Failed to create image manager: %v", err) } @@ -151,10 +154,8 @@ func (fetcher *Fetcher) pullAndPin() error { return err } - clientset, _ := GetKubernetesConnection() - - containers := fetcher.getContainers(clientset) - for container, _ := range containers { + containers := fetcher.getContainers(fetcher.clientset) + for container := range containers { klog.V(3).Infof("Found container %s\n", container) } diff --git a/cmd/fetcher/kubernetes.go b/cmd/fetcher/kubernetes.go index 639c65b..3877d5a 100644 --- a/cmd/fetcher/kubernetes.go +++ b/cmd/fetcher/kubernetes.go @@ -6,7 +6,7 @@ import ( "log" ) -func GetKubernetesConnection() (*kubernetes.Clientset, string) { +func GetKubernetesConnection() *kubernetes.Clientset { loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() configOverrides := &clientcmd.ConfigOverrides{} kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) @@ -21,9 +21,5 @@ func GetKubernetesConnection() (*kubernetes.Clientset, string) { log.Panicln(err.Error()) } - namespace, _, err := kubeConfig.Namespace() - if err != nil { - log.Panicf("Could not get namespace") - } - return clientset, namespace + return clientset } diff --git a/cmd/fetcher/main.go b/cmd/fetcher/main.go index 5f04ce9..54da96a 100644 --- a/cmd/fetcher/main.go +++ b/cmd/fetcher/main.go @@ -12,7 +12,9 @@ func main() { klogFlags := goflags.NewFlagSet("", goflags.PanicOnError) klog.InitFlags(klogFlags) - fetcher := Fetcher{} + clientset := GetKubernetesConnection() + config := &Config{} + fetcher := NewFetcher(config, clientset) cmd := &cobra.Command{ Use: "kube-fetcher", @@ -23,21 +25,24 @@ images referenced in pods are made available on the local k8s node and pinned so they don't get garbage collected'`, RunE: func(cmd *cobra.Command, args []string) error { err := fetcher.pullAndPin() + //watcher := Watcher{} + //watcher.WatchPods(clientset, config.KubernetesNamespace) + return err }, } - cmd.PersistentFlags().StringVar(&fetcher.KubernetesNamespace, "kubernetes-namespace", + cmd.PersistentFlags().StringVar(&config.KubernetesNamespace, "kubernetes-namespace", "", "Kubernetes containerdNamespace to inspect (default is all namespaces)") - cmd.PersistentFlags().StringVar(&fetcher.SocketPath, "socket", + cmd.PersistentFlags().StringVar(&config.SocketPath, "socket", "/run/containerd/containerd.sock", "Containerd socket") - cmd.PersistentFlags().StringVar(&fetcher.ContainerdNamespace, "containerd-namespace", + cmd.PersistentFlags().StringVar(&config.ContainerdNamespace, "containerd-namespace", "k8s.io", "Containerd namespace to use") - cmd.PersistentFlags().StringVar(&fetcher.Nodename, "nodename", "", + cmd.PersistentFlags().StringVar(&config.Nodename, "nodename", "", "Kubernetes node name the fetcher is running on, it will only fetch images running on other nodes") - cmd.PersistentFlags().DurationVar(&fetcher.ReadyDuration, "ready-duration", + cmd.PersistentFlags().DurationVar(&config.ReadyDuration, "ready-duration", 1*time.Hour, "Time a pod must be ready before its image will be fetched") - cmd.PersistentFlags().BoolVar(&fetcher.includeControllerNodes, "include-controllers", + cmd.PersistentFlags().BoolVar(&config.includeControllerNodes, "include-controllers", false, "Include controller nodes") cmd.Flags().AddGoFlagSet(klogFlags) diff --git a/cmd/fetcher/watcher.go b/cmd/fetcher/watcher.go new file mode 100644 index 0000000..cb13e92 --- /dev/null +++ b/cmd/fetcher/watcher.go @@ -0,0 +1,59 @@ +package main + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type Watcher struct { +} + +func (watcher *Watcher) WatchPods( + clientset *kubernetes.Clientset, + namespace string) { + + watchlist := cache.NewListWatchFromClient( + clientset.CoreV1().RESTClient(), + "pods", + namespace, + fields.Everything(), + ) + + addOrUpdate := func(obj interface{}) { + pod := watcher.getPod(obj) + klog.Infof("Added/updated %s/%s\n", pod.Namespace, pod.Name) + } + + options := cache.InformerOptions{ + ListerWatcher: watchlist, + ObjectType: &corev1.Pod{}, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: addOrUpdate, + UpdateFunc: func(_ any, obj any) { + addOrUpdate(obj) + }, + DeleteFunc: func(obj any) { + pod := watcher.getPod(obj) + klog.Infof("Delete %s/%s\n", pod.Namespace, pod.Name) + }, + }, + ResyncPeriod: 0, + } + + _, controller := cache.NewInformerWithOptions(options) + stop := make(chan struct{}) + defer close(stop) + go controller.Run(stop) + select {} +} + +func (watcher *Watcher) getPod(obj any) *corev1.Pod { + k8spod, ok := obj.(*corev1.Pod) + if !ok { + klog.Fatalf("Object of wrong type: %v", obj) + } + return k8spod +}