package main import ( "context" "fmt" containerd2 "git.wamblee.org/public/kube-fetcher/pkg/ctrd" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "os" "strings" "time" ) type Fetcher struct { clientset *kubernetes.Clientset config *Config watcher *Watcher } func NewFetcher(clientset *kubernetes.Clientset, config *Config, watcher *Watcher) *Fetcher { return &Fetcher{ config: config, clientset: clientset, watcher: watcher, } } func (fetcher *Fetcher) canonicalizeImageName(image string) string { parts := strings.Split(image, "/") if len(parts) < 1 { panic(fmt.Errorf("Could not disect image name '%s'", image)) } if len(parts) == 1 { parts = []string{"docker.io", "library", parts[0]} } if !strings.Contains(parts[len(parts)-1], ":") { parts[len(parts)-1] = parts[len(parts)-1] + ":latest" } registry := "" if strings.Contains(parts[0], ".") { registry = parts[0] parts = parts[1:] } else { registry = "docker.io" } if registry == "docker.io" && len(parts) == 1 { parts[0] = "library/" + parts[0] } return registry + "/" + strings.Join(parts, "/") } func (fetcher *Fetcher) wasReady(pod *v1.Pod, controllers map[string]bool) bool { if !fetcher.config.includeControllerNodes { klog.V(3).Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName) if _, ok := controllers[pod.Spec.NodeName]; ok { return false } } return time.Now().Sub(pod.CreationTimestamp.Time) >= fetcher.config.ReadyDuration } func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[string]bool { controllers := fetcher.getControllerNames(clientset) pods := fetcher.watcher.getPods() //pods, err := clientset.CoreV1().Pods(fetcher.config.KubernetesNamespace).List(context.Background(), // metav1.ListOptions{}) //if err != nil { // panic(err) //} containers := make(map[string]bool) containersOnCurrentNode := make(map[string]bool) for _, pod := range pods { klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) for _, container := range pod.Spec.InitContainers { klog.V(3).Infof(" %s\n", container.Image) if pod.Spec.NodeName == fetcher.config.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { if fetcher.wasReady(pod, controllers) { containers[fetcher.canonicalizeImageName(container.Image)] = true } } } for _, container := range pod.Spec.Containers { klog.V(3).Infof(" %s\n", container.Image) if pod.Spec.NodeName == fetcher.config.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { if fetcher.wasReady(pod, controllers) { containers[fetcher.canonicalizeImageName(container.Image)] = true } } } } for container := range containersOnCurrentNode { delete(containers, container) } return containers } func (fetcher *Fetcher) pullAndPin(pullAll bool) error { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { nodeName = "UNKNOWN" } // Create the image manager containerd, err := containerd2.NewContainerd(fetcher.config.SocketPath, fetcher.config.ContainerdNamespace) if err != nil { klog.Fatalf("Failed to create image manager: %v", err) } imgs, err := containerd.List() if err != nil { return err } containers := fetcher.getContainers(fetcher.clientset) for container := range containers { klog.V(3).Infof("Found container %s\n", container) } // unpin images that are not used for container, pinned := range imgs { if !containers[container] && pinned { klog.Infof("%s: Unpinning %s\n", nodeName, container) err := containerd.Unpin(container) if err != nil { klog.Warningf(" error: %v", err) } } } // Pull images that are used for container := range containers { if _, found := imgs[container]; !found || pullAll { tag := "" for registry, mirror := range fetcher.config.mirrors { if strings.HasPrefix(container, registry+"/") { tag = container container = mirror + container[len(registry):] } } klog.Infof("%s: Pulling %s\n", nodeName, container) err := containerd.Pull(container) if err != nil { klog.Warningf("error: %v", err) } if tag != "" { klog.Infof("%s: Tagging '%s' -> '%s'", nodeName, container, tag) err := containerd.Tag(container, tag) if err != nil { klog.Warningf("Could not tag '%s' -> '%s'", container, tag) } } } } imgs, err = containerd.List() if err != nil { return err } // Pin images that are used and present for container := range containers { if pinned, found := imgs[container]; found && !pinned { klog.Infof("%s: Pinning %s\n", nodeName, container) err := containerd.Pin(container) if err != nil { klog.Warningf(" error: %v", err) } } } return nil } func (fetcher *Fetcher) getControllerNames(clientset *kubernetes.Clientset) map[string]bool { nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ LabelSelector: "node-role.kubernetes.io/control-plane=,!node-role.kubernetes.io/master", }) if err != nil { fmt.Printf("Error listing nodes: %v\n", err) os.Exit(1) } nodeset := make(map[string]bool) for _, node := range nodes.Items { nodeset[node.Name] = true } return nodeset } // TODO // 1. periodic pull and pin, configurable time interval // 2. logging summary results of each pull and pin // 3. docker container with multi-stage build // 4. kustomize deployment in wamb lee-operators namespace (kubernetes-setup repo)