package main import ( "context" "fmt" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "os" "regexp" "strings" "time" ) type Fetcher struct { KubernetesNamespace string SocketPath string ContainerdNamespace string Nodename string ReadyDuration time.Duration includeControllerNodes bool } func (fetcher *Fetcher) canonicalizeImageName(image string) string { pattern := `^(?:(?P[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9](?::[0-9]+)?)/)?(?P(?:[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*(?:/[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*)*)?)?(?::(?P[\w][\w.-]{0,127}))?(?:@(?P[a-z][a-z0-9]*(?:[+.-][a-z][a-z0-9]*)*:[a-zA-Z0-9*+,-./:;=@_]{32,}))?$` re := regexp.MustCompile(pattern) matches := re.FindStringSubmatch(image) subexpNames := re.SubexpNames() if matches == nil { panic(fmt.Errorf("Invalid image reference: %s\n", image)) } result := make(map[string]string) for i, name := range subexpNames { if i != 0 && name != "" && i < len(matches) { result[name] = matches[i] } } klog.V(3).Infof("Image: %s\n", image) klog.V(3).Infof(" Registry: %s\n", result["registry"]) klog.V(3).Infof(" Repository: %s\n", result["repository"]) klog.V(3).Infof(" Tag: %s\n", result["tag"]) klog.V(3).Infof(" Digest: %s\n", result["digest"]) registry := result["registry"] repository := result["repository"] tag := result["tag"] digest := result["digest"] // Check if image has a tag if digest == "" && tag == "" { tag = "latest" } // Check if image has a host if registry == "" { registry = "docker.io" } // Handle the case when remainder doesn't specify library but it's not a docker.io official image if registry == "docker.io" && !strings.Contains(repository, "/") { repository = "library/" + repository } fullimage := registry + "/" + repository if tag != "" { fullimage += ":" + tag } if digest != "" { fullimage += "@" + digest } return fullimage } func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]bool) bool { ready := false if !fetcher.includeControllerNodes { klog.Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName) if _, ok := controllers[pod.Spec.NodeName]; ok { return false } } for _, condition := range pod.Status.Conditions { if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.ReadyDuration { ready = true } } } if pod.DeletionTimestamp != nil { ready = false } return ready } func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[string]bool { controllers := fetcher.getControllerNames(clientset) pods, err := clientset.CoreV1().Pods(fetcher.KubernetesNamespace).List(context.Background(), metav1.ListOptions{}) if err != nil { panic(err) } containers := make(map[string]bool) containersOnCurrentNode := make(map[string]bool) for _, pod := range pods.Items { klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) for _, container := range pod.Spec.InitContainers { klog.V(3).Infof(" %s\n", container.Image) if pod.Spec.NodeName == fetcher.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { if fetcher.isReadyForSomeTime(&pod, controllers) { containers[fetcher.canonicalizeImageName(container.Image)] = true } } } for _, container := range pod.Spec.Containers { klog.V(3).Infof(" %s\n", container.Image) if pod.Spec.NodeName == fetcher.Nodename { containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true } else { if fetcher.isReadyForSomeTime(&pod, controllers) { containers[fetcher.canonicalizeImageName(container.Image)] = true } } } } for container, _ := range containersOnCurrentNode { delete(containers, container) } return containers } func (fetcher *Fetcher) pullAndPin() error { // Create the image manager containerd, err := NewContainerd(fetcher.SocketPath, fetcher.ContainerdNamespace) if err != nil { klog.Fatalf("Failed to create image manager: %v", err) } imgs, err := containerd.List() if err != nil { return err } clientset, _ := GetKubernetesConnection() containers := fetcher.getContainers(clientset) for container, _ := range containers { klog.V(3).Infof("Found container %s\n", container) } // unpin images that are not used for container, pinned := range imgs { if !containers[container] && pinned { klog.Infof("Unpinning %s\n", container) err := containerd.Unpin(container) if err != nil { klog.Warningf(" error: %v", err) } } } // Pull images that are used for container := range containers { if _, found := imgs[container]; !found { klog.Infof("Pulling %s\n", container) err := containerd.Pull(container) if err != nil { klog.Warningf("error: %v", err) } } } imgs, err = containerd.List() if err != nil { return err } // Pin images that are used and present for container := range containers { if pinned, found := imgs[container]; found && !pinned { klog.Infof("Pinning %s\n", container) err := containerd.Pin(container) if err != nil { klog.Warningf(" error: %v", err) } } } return nil } func (fetcher *Fetcher) getControllerNames(clientset *kubernetes.Clientset) map[string]bool { nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ LabelSelector: "node-role.kubernetes.io/control-plane=,!node-role.kubernetes.io/master", }) if err != nil { fmt.Printf("Error listing nodes: %v\n", err) os.Exit(1) } nodeset := make(map[string]bool) for _, node := range nodes.Items { nodeset[node.Name] = true } return nodeset } // TODO // 1. periodic pull and pin, configurable time interval // 2. logging summary results of each pull and pin // 3. docker container with multi-stage build // 4. kustomize deployment in wamb lee-operators namespace (kubernetes-setup repo)