From 58ef7f76e44eb1858c6db2d0eb8a9bd7d32bdfd2 Mon Sep 17 00:00:00 2001 From: Erik Brakkee Date: Sun, 2 Mar 2025 09:14:48 +0100 Subject: [PATCH] only pulling images that are ready for 1 hour on another node. Also configurable ready interval. --- cmd/fetcher/main.go | 10 +++- cmd/fetcher/sync.go | 128 ++++++++++++++++++++++++++++++-------------- 2 files changed, 97 insertions(+), 41 deletions(-) diff --git a/cmd/fetcher/main.go b/cmd/fetcher/main.go index 7887a7b..8d36287 100644 --- a/cmd/fetcher/main.go +++ b/cmd/fetcher/main.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" "os" + "time" ) func main() { @@ -14,6 +15,8 @@ func main() { var kubernetesNamespace string var socketPath string var containerdNamespace string + var nodename string + var readyDuration time.Duration cmd := &cobra.Command{ Use: "kube-fetcher", @@ -23,7 +26,8 @@ Queries k8s for all running pods and makes sure that all images referenced in pods are made available on the local k8s node and pinned so they don't get garbage collected'`, RunE: func(cmd *cobra.Command, args []string) error { - err := pullAndPin(kubernetesNamespace, socketPath, containerdNamespace) + err := pullAndPin(kubernetesNamespace, socketPath, containerdNamespace, nodename, + readyDuration) return err }, } @@ -34,6 +38,10 @@ so they don't get garbage collected'`, "/run/containerd/containerd.sock", "Containerd socket") cmd.PersistentFlags().StringVar(&containerdNamespace, "containerd-namespace", "k8s.io", "Containerd namespace to use") + cmd.PersistentFlags().StringVar(&nodename, "nodename", "", + "Kubernetes node name the fetcher is running on, it will only fetch images running on other nodes") + cmd.PersistentFlags().DurationVar(&readyDuration, "ready-duration", + 1*time.Hour, "Time a pod must be ready before its image will be fetched") cmd.Flags().AddGoFlagSet(klogFlags) err := cmd.Execute() diff --git a/cmd/fetcher/sync.go b/cmd/fetcher/sync.go index 79e8b31..c791a83 100644 --- a/cmd/fetcher/sync.go +++ b/cmd/fetcher/sync.go @@ -3,60 +3,84 @@ package main import ( "context" "fmt" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "regexp" "strings" + "time" ) func canonicalizeImageName(image string) string { + pattern := `^(?:(?P[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9](?::[0-9]+)?)/)?(?P(?:[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*(?:/[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*)*)?)?(?::(?P[\w][\w.-]{0,127}))?(?:@(?P[a-z][a-z0-9]*(?:[+.-][a-z][a-z0-9]*)*:[a-zA-Z0-9*+,-./:;=@_]{32,}))?$` + re := regexp.MustCompile(pattern) + matches := re.FindStringSubmatch(image) + subexpNames := re.SubexpNames() + + if matches == nil { + panic(fmt.Errorf("Invalid image reference: %s\n", image)) + } + result := make(map[string]string) + for i, name := range subexpNames { + if i != 0 && name != "" && i < len(matches) { + result[name] = matches[i] + } + } + klog.V(3).Infof("Image: %s\n", image) + klog.V(3).Infof(" Registry: %s\n", result["registry"]) + klog.V(3).Infof(" Repository: %s\n", result["repository"]) + klog.V(3).Infof(" Tag: %s\n", result["tag"]) + klog.V(3).Infof(" Digest: %s\n", result["digest"]) + + registry := result["registry"] + repository := result["repository"] + tag := result["tag"] + digest := result["digest"] + // Check if image has a tag - var name, tag string - if parts := strings.Split(image, ":"); len(parts) > 1 { - name = parts[0] - tag = parts[1] - } else { - name = image - tag = "latest" // Default tag + if digest == "" && tag == "" { + tag = "latest" } // Check if image has a host - var host, remainder string - if strings.Contains(name, "/") { - parts := strings.SplitN(name, "/", 2) - - // Determine if the first part is a host - // A host must contain a dot or colon (for registries with ports) - if strings.Contains(parts[0], ".") || strings.Contains(parts[0], ":") { - host = parts[0] - remainder = parts[1] - } else { - // No host specified, use default docker.io - host = "docker.io" - remainder = name - } - } else { - // No host and no /, use default docker.io and library - host = "docker.io" - remainder = "library/" + name + if registry == "" { + registry = "docker.io" } // Handle the case when remainder doesn't specify library but it's not a docker.io official image - if host == "docker.io" && !strings.Contains(remainder, "/") { - remainder = "library/" + remainder - } else if host == "docker.io" && !strings.HasPrefix(remainder, "library/") { - // Check if it's not already in the library - if parts := strings.SplitN(remainder, "/", 2); len(parts) == 2 && parts[0] != "library" { - // Not a library image and not already properly formatted - // This is a docker.io user image (e.g., user/image) - // We keep it as is - } + if registry == "docker.io" && !strings.Contains(repository, "/") { + repository = "library/" + repository } - return fmt.Sprintf("%s/%s:%s", host, remainder, tag) + fullimage := registry + "/" + repository + if tag != "" { + fullimage += ":" + tag + } + if digest != "" { + fullimage += "@" + digest + } + return fullimage } -func getContainers(clientset *kubernetes.Clientset, kubernetesNamespace string) map[string]bool { +func isReadyForSomeTime(pod *v1.Pod, duration time.Duration) bool { + ready := false + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { + if time.Now().Sub(condition.LastTransitionTime.Time) > duration { + ready = true + } + } + } + if pod.DeletionTimestamp != nil { + ready = false + } + return ready +} + +func getContainers(clientset *kubernetes.Clientset, kubernetesNamespace, nodename string, + readyDuration time.Duration) map[string]bool { pods, err := clientset.CoreV1().Pods(kubernetesNamespace).List(context.Background(), metav1.ListOptions{}) if err != nil { @@ -64,22 +88,40 @@ func getContainers(clientset *kubernetes.Clientset, kubernetesNamespace string) } containers := make(map[string]bool) + containersOnCurrentNode := make(map[string]bool) for _, pod := range pods.Items { klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) for _, container := range pod.Spec.InitContainers { klog.V(3).Infof(" %s\n", container.Image) - containers[canonicalizeImageName(container.Image)] = true + if pod.Spec.NodeName == nodename { + containersOnCurrentNode[canonicalizeImageName(container.Image)] = true + } else { + if isReadyForSomeTime(&pod, readyDuration) { + containers[canonicalizeImageName(container.Image)] = true + } + } } for _, container := range pod.Spec.Containers { klog.V(3).Infof(" %s\n", container.Image) - containers[canonicalizeImageName(container.Image)] = true + if pod.Spec.NodeName == nodename { + containersOnCurrentNode[canonicalizeImageName(container.Image)] = true + } else { + if isReadyForSomeTime(&pod, readyDuration) { + containers[canonicalizeImageName(container.Image)] = true + } + } } } + + for container, _ := range containersOnCurrentNode { + delete(containers, container) + } return containers } -func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace string) error { +func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace, nodename string, + readyDuration time.Duration) error { // Create the image manager containerd, err := NewContainerd(socketPath, containerdNamespace) @@ -94,7 +136,7 @@ func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace string) err clientset, _ := GetKubernetesConnection() - containers := getContainers(clientset, kubernetesNamespace) + containers := getContainers(clientset, kubernetesNamespace, nodename, readyDuration) for container, _ := range containers { klog.V(3).Infof("Found container %s\n", container) } @@ -138,3 +180,9 @@ func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace string) err } return nil } + +// TODO +// 1. periodic pull and pin, configurable time interval +// 2. logging summary results of each pull and pin +// 3. docker container with multi-stage build +// 4. kustomize deployment in wamb lee-operators namespace (kubernetes-setup repo)