only pulling images that are ready for 1 hour on another node.

Also configurable ready interval.
This commit is contained in:
Erik Brakkee 2025-03-02 09:14:48 +01:00
parent fe485f7fe7
commit 58ef7f76e4
2 changed files with 97 additions and 41 deletions

View File

@ -5,6 +5,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"os" "os"
"time"
) )
func main() { func main() {
@ -14,6 +15,8 @@ func main() {
var kubernetesNamespace string var kubernetesNamespace string
var socketPath string var socketPath string
var containerdNamespace string var containerdNamespace string
var nodename string
var readyDuration time.Duration
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "kube-fetcher", Use: "kube-fetcher",
@ -23,7 +26,8 @@ Queries k8s for all running pods and makes sure that all
images referenced in pods are made available on the local k8s node and pinned images referenced in pods are made available on the local k8s node and pinned
so they don't get garbage collected'`, so they don't get garbage collected'`,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
err := pullAndPin(kubernetesNamespace, socketPath, containerdNamespace) err := pullAndPin(kubernetesNamespace, socketPath, containerdNamespace, nodename,
readyDuration)
return err return err
}, },
} }
@ -34,6 +38,10 @@ so they don't get garbage collected'`,
"/run/containerd/containerd.sock", "Containerd socket") "/run/containerd/containerd.sock", "Containerd socket")
cmd.PersistentFlags().StringVar(&containerdNamespace, "containerd-namespace", cmd.PersistentFlags().StringVar(&containerdNamespace, "containerd-namespace",
"k8s.io", "Containerd namespace to use") "k8s.io", "Containerd namespace to use")
cmd.PersistentFlags().StringVar(&nodename, "nodename", "",
"Kubernetes node name the fetcher is running on, it will only fetch images running on other nodes")
cmd.PersistentFlags().DurationVar(&readyDuration, "ready-duration",
1*time.Hour, "Time a pod must be ready before its image will be fetched")
cmd.Flags().AddGoFlagSet(klogFlags) cmd.Flags().AddGoFlagSet(klogFlags)
err := cmd.Execute() err := cmd.Execute()

View File

@ -3,60 +3,84 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"regexp"
"strings" "strings"
"time"
) )
func canonicalizeImageName(image string) string { func canonicalizeImageName(image string) string {
pattern := `^(?:(?P<registry>[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9](?::[0-9]+)?)/)?(?P<repository>(?:[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*(?:/[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*)*)?)?(?::(?P<tag>[\w][\w.-]{0,127}))?(?:@(?P<digest>[a-z][a-z0-9]*(?:[+.-][a-z][a-z0-9]*)*:[a-zA-Z0-9*+,-./:;=@_]{32,}))?$`
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(image)
subexpNames := re.SubexpNames()
if matches == nil {
panic(fmt.Errorf("Invalid image reference: %s\n", image))
}
result := make(map[string]string)
for i, name := range subexpNames {
if i != 0 && name != "" && i < len(matches) {
result[name] = matches[i]
}
}
klog.V(3).Infof("Image: %s\n", image)
klog.V(3).Infof(" Registry: %s\n", result["registry"])
klog.V(3).Infof(" Repository: %s\n", result["repository"])
klog.V(3).Infof(" Tag: %s\n", result["tag"])
klog.V(3).Infof(" Digest: %s\n", result["digest"])
registry := result["registry"]
repository := result["repository"]
tag := result["tag"]
digest := result["digest"]
// Check if image has a tag // Check if image has a tag
var name, tag string if digest == "" && tag == "" {
if parts := strings.Split(image, ":"); len(parts) > 1 { tag = "latest"
name = parts[0]
tag = parts[1]
} else {
name = image
tag = "latest" // Default tag
} }
// Check if image has a host // Check if image has a host
var host, remainder string if registry == "" {
if strings.Contains(name, "/") { registry = "docker.io"
parts := strings.SplitN(name, "/", 2)
// Determine if the first part is a host
// A host must contain a dot or colon (for registries with ports)
if strings.Contains(parts[0], ".") || strings.Contains(parts[0], ":") {
host = parts[0]
remainder = parts[1]
} else {
// No host specified, use default docker.io
host = "docker.io"
remainder = name
}
} else {
// No host and no /, use default docker.io and library
host = "docker.io"
remainder = "library/" + name
} }
// Handle the case when remainder doesn't specify library but it's not a docker.io official image // Handle the case when remainder doesn't specify library but it's not a docker.io official image
if host == "docker.io" && !strings.Contains(remainder, "/") { if registry == "docker.io" && !strings.Contains(repository, "/") {
remainder = "library/" + remainder repository = "library/" + repository
} else if host == "docker.io" && !strings.HasPrefix(remainder, "library/") {
// Check if it's not already in the library
if parts := strings.SplitN(remainder, "/", 2); len(parts) == 2 && parts[0] != "library" {
// Not a library image and not already properly formatted
// This is a docker.io user image (e.g., user/image)
// We keep it as is
}
} }
return fmt.Sprintf("%s/%s:%s", host, remainder, tag) fullimage := registry + "/" + repository
if tag != "" {
fullimage += ":" + tag
}
if digest != "" {
fullimage += "@" + digest
}
return fullimage
} }
func getContainers(clientset *kubernetes.Clientset, kubernetesNamespace string) map[string]bool { func isReadyForSomeTime(pod *v1.Pod, duration time.Duration) bool {
ready := false
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
if time.Now().Sub(condition.LastTransitionTime.Time) > duration {
ready = true
}
}
}
if pod.DeletionTimestamp != nil {
ready = false
}
return ready
}
func getContainers(clientset *kubernetes.Clientset, kubernetesNamespace, nodename string,
readyDuration time.Duration) map[string]bool {
pods, err := clientset.CoreV1().Pods(kubernetesNamespace).List(context.Background(), pods, err := clientset.CoreV1().Pods(kubernetesNamespace).List(context.Background(),
metav1.ListOptions{}) metav1.ListOptions{})
if err != nil { if err != nil {
@ -64,22 +88,40 @@ func getContainers(clientset *kubernetes.Clientset, kubernetesNamespace string)
} }
containers := make(map[string]bool) containers := make(map[string]bool)
containersOnCurrentNode := make(map[string]bool)
for _, pod := range pods.Items { for _, pod := range pods.Items {
klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name)
for _, container := range pod.Spec.InitContainers { for _, container := range pod.Spec.InitContainers {
klog.V(3).Infof(" %s\n", container.Image) klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == nodename {
containersOnCurrentNode[canonicalizeImageName(container.Image)] = true
} else {
if isReadyForSomeTime(&pod, readyDuration) {
containers[canonicalizeImageName(container.Image)] = true containers[canonicalizeImageName(container.Image)] = true
} }
}
}
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
klog.V(3).Infof(" %s\n", container.Image) klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == nodename {
containersOnCurrentNode[canonicalizeImageName(container.Image)] = true
} else {
if isReadyForSomeTime(&pod, readyDuration) {
containers[canonicalizeImageName(container.Image)] = true containers[canonicalizeImageName(container.Image)] = true
} }
} }
}
}
for container, _ := range containersOnCurrentNode {
delete(containers, container)
}
return containers return containers
} }
func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace string) error { func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace, nodename string,
readyDuration time.Duration) error {
// Create the image manager // Create the image manager
containerd, err := NewContainerd(socketPath, containerdNamespace) containerd, err := NewContainerd(socketPath, containerdNamespace)
@ -94,7 +136,7 @@ func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace string) err
clientset, _ := GetKubernetesConnection() clientset, _ := GetKubernetesConnection()
containers := getContainers(clientset, kubernetesNamespace) containers := getContainers(clientset, kubernetesNamespace, nodename, readyDuration)
for container, _ := range containers { for container, _ := range containers {
klog.V(3).Infof("Found container %s\n", container) klog.V(3).Infof("Found container %s\n", container)
} }
@ -138,3 +180,9 @@ func pullAndPin(kubernetesNamespace, socketPath, containerdNamespace string) err
} }
return nil return nil
} }
// TODO
// 1. periodic pull and pin, configurable time interval
// 2. logging summary results of each pull and pin
// 3. docker container with multi-stage build
// 4. kustomize deployment in wamb lee-operators namespace (kubernetes-setup repo)