From e8d4adaf53da6c1557e887915fc162dfe85b7bac Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 2 Mar 2025 19:47:44 +0100 Subject: [PATCH] periodically checking now for pods. Now also dealing with deleted resources in the time interval. Making sure that new pods are not immediately synced on other nodes to avoid pull rate limits caused by this code. --- cmd/fetcher/config.go | 2 ++ cmd/fetcher/fetcher.go | 42 +++++++++++---------------- cmd/fetcher/main.go | 27 ++++++++++++++---- cmd/fetcher/watcher.go | 65 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 100 insertions(+), 36 deletions(-) diff --git a/cmd/fetcher/config.go b/cmd/fetcher/config.go index f4e4707..55416f8 100644 --- a/cmd/fetcher/config.go +++ b/cmd/fetcher/config.go @@ -3,10 +3,12 @@ package main import "time" type Config struct { + PollInterval time.Duration KubernetesNamespace string SocketPath string ContainerdNamespace string Nodename string ReadyDuration time.Duration includeControllerNodes bool + monitoringWindowSize time.Duration } diff --git a/cmd/fetcher/fetcher.go b/cmd/fetcher/fetcher.go index b10462c..11ac479 100644 --- a/cmd/fetcher/fetcher.go +++ b/cmd/fetcher/fetcher.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -15,14 +14,16 @@ import ( ) type Fetcher struct { - config *Config clientset *kubernetes.Clientset + config *Config + watcher *Watcher } -func NewFetcher(config *Config, clientset *kubernetes.Clientset) *Fetcher { +func NewFetcher(clientset *kubernetes.Clientset, config *Config, watcher *Watcher) *Fetcher { return &Fetcher{ config: config, clientset: clientset, + watcher: watcher, } } @@ -77,48 +78,39 @@ func (fetcher *Fetcher) canonicalizeImageName(image string) string { return fullimage } -func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]bool) bool { - ready := false +func (fetcher *Fetcher) wasReady(pod *v1.Pod, controllers map[string]bool) bool { if !fetcher.config.includeControllerNodes { - klog.Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName) + klog.V(3).Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName) if _, ok := controllers[pod.Spec.NodeName]; ok { return false } } - for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { - if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.config.ReadyDuration { - ready = true - } - } - } - if pod.DeletionTimestamp != nil { - ready = false - } - return ready + return time.Now().Sub(pod.CreationTimestamp.Time) >= fetcher.config.ReadyDuration } func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[string]bool { controllers := fetcher.getControllerNames(clientset) - pods, err := clientset.CoreV1().Pods(fetcher.config.KubernetesNamespace).List(context.Background(), - metav1.ListOptions{}) - if err != nil { - panic(err) - } + pods := fetcher.watcher.getPods() + + //pods, err := clientset.CoreV1().Pods(fetcher.config.KubernetesNamespace).List(context.Background(), + // metav1.ListOptions{}) + //if err != nil { + // panic(err) + //} containers := make(map[string]bool) containersOnCurrentNode := make(map[string]bool) - for _, pod := range pods.Items { + for _, pod := range pods { klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) for _, container := range pod.Spec.InitContainers { klog.V(3).Infof(" %s\n", container.Image) if pod.Spec.NodeName == fetcher.config.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { - if fetcher.isReadyForSomeTime(&pod, controllers) { + if fetcher.wasReady(pod, controllers) { containers[fetcher.canonicalizeImageName(container.Image)] = true } } @@ -128,7 +120,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin if pod.Spec.NodeName == fetcher.config.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { - if fetcher.isReadyForSomeTime(&pod, controllers) { + if fetcher.wasReady(pod, controllers) { containers[fetcher.canonicalizeImageName(container.Image)] = true } } diff --git a/cmd/fetcher/main.go b/cmd/fetcher/main.go index 54da96a..cbfa39e 100644 --- a/cmd/fetcher/main.go +++ b/cmd/fetcher/main.go @@ -14,7 +14,6 @@ func main() { clientset := GetKubernetesConnection() config := &Config{} - fetcher := NewFetcher(config, clientset) cmd := &cobra.Command{ Use: "kube-fetcher", @@ -24,11 +23,25 @@ Queries k8s for all running pods and makes sure that all images referenced in pods are made available on the local k8s node and pinned so they don't get garbage collected'`, RunE: func(cmd *cobra.Command, args []string) error { - err := fetcher.pullAndPin() - //watcher := Watcher{} - //watcher.WatchPods(clientset, config.KubernetesNamespace) + serializer := make(chan func()) + go func() { + for action := range serializer { + action() + } + }() + watcher := NewWatcher(clientset, config.monitoringWindowSize, config.KubernetesNamespace, serializer) + fetcher := NewFetcher(clientset, config, watcher) - return err + ticker := time.NewTicker(config.PollInterval) + for { + select { + case <-ticker.C: + serializer <- func() { + klog.V(3).Infof("Fetcher.pullAndPin") + fetcher.pullAndPin() + } + } + } }, } @@ -44,6 +57,10 @@ so they don't get garbage collected'`, 1*time.Hour, "Time a pod must be ready before its image will be fetched") cmd.PersistentFlags().BoolVar(&config.includeControllerNodes, "include-controllers", false, "Include controller nodes") + cmd.PersistentFlags().DurationVar(&config.monitoringWindowSize, "monitoring-window", + 6*time.Hour, "Monitoring window to see what pods were active") + cmd.PersistentFlags().DurationVar(&config.PollInterval, "poll-interval", + 1*time.Minute, "Poll interval for checking whether to pull images. ") cmd.Flags().AddGoFlagSet(klogFlags) err := cmd.Execute() diff --git a/cmd/fetcher/watcher.go b/cmd/fetcher/watcher.go index cb13e92..ab72c1c 100644 --- a/cmd/fetcher/watcher.go +++ b/cmd/fetcher/watcher.go @@ -1,17 +1,55 @@ package main import ( + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "time" ) type Watcher struct { + monitoringWindow time.Duration + // Pods that were active in the given window. This list is pruned + // for entries that have not been running for some time. + // Map of Pod.metadata.uid to pod + pods map[types.UID]*corev1.Pod + serializer chan<- func() } -func (watcher *Watcher) WatchPods( +func NewWatcher(clientset *kubernetes.Clientset, monitoringWindow time.Duration, namespace string, serializer chan<- func()) *Watcher { + watcher := &Watcher{ + monitoringWindow: monitoringWindow, + pods: make(map[types.UID]*corev1.Pod), + serializer: serializer, + } + watcher.watchPods(clientset, namespace) + return watcher +} + +func (watcher *Watcher) getPods() []*corev1.Pod { + pods := make(map[types.UID]*corev1.Pod) + klog.V(3).Infof("PODS %v", pods) + res := make([]*corev1.Pod, 0) + for uid, pod := range watcher.pods { + klog.V(3).Infof("Checking pod %s/%s\n", pod.Namespace, pod.Name) + if pod.DeletionTimestamp == nil || + pod.DeletionTimestamp.IsZero() || + time.Now().Sub(pod.DeletionTimestamp.Time) <= watcher.monitoringWindow { + klog.V(3).Infof("Pod found in moving windows %s/%s\n", + pod.Namespace, pod.Name) + pods[uid] = pod + res = append(res, pod) + } + } + watcher.pods = pods + return res +} + +func (watcher *Watcher) watchPods( clientset *kubernetes.Clientset, namespace string) { @@ -23,8 +61,17 @@ func (watcher *Watcher) WatchPods( ) addOrUpdate := func(obj interface{}) { - pod := watcher.getPod(obj) - klog.Infof("Added/updated %s/%s\n", pod.Namespace, pod.Name) + watcher.serializer <- func() { + pod := watcher.getPod(obj) + // only add the pod if it all its containers were ready + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue { + return + } + } + klog.V(3).Infof("Added/updated %s/%s\n", pod.Namespace, pod.Name) + watcher.pods[pod.UID] = pod + } } options := cache.InformerOptions{ @@ -36,8 +83,11 @@ func (watcher *Watcher) WatchPods( addOrUpdate(obj) }, DeleteFunc: func(obj any) { - pod := watcher.getPod(obj) - klog.Infof("Delete %s/%s\n", pod.Namespace, pod.Name) + watcher.serializer <- func() { + pod := watcher.getPod(obj) + klog.V(3).Infof("Delete %s/%s\n", pod.Namespace, pod.Name) + watcher.pods[pod.UID].DeletionTimestamp.Time = time.Now() + } }, }, ResyncPeriod: 0, @@ -47,7 +97,10 @@ func (watcher *Watcher) WatchPods( stop := make(chan struct{}) defer close(stop) go controller.Run(stop) - select {} + // Wait for the cache to sync + if !cache.WaitForCacheSync(stop, controller.HasSynced) { + panic(fmt.Errorf("failed to sync cache")) + } } func (watcher *Watcher) getPod(obj any) *corev1.Pod {