kube-fetcher/cmd/fetcher/fetcher.go

195 lines
5.2 KiB
Go

package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"regexp"
"strings"
"time"
)
type Fetcher struct {
KubernetesNamespace string
SocketPath string
ContainerdNamespace string
Nodename string
ReadyDuration time.Duration
}
func (fetcher *Fetcher) canonicalizeImageName(image string) string {
pattern := `^(?:(?P<registry>[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9](?::[0-9]+)?)/)?(?P<repository>(?:[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*(?:/[a-z0-9]+(?:(?:[._]|__|[-]+)[a-z0-9]+)*)*)?)?(?::(?P<tag>[\w][\w.-]{0,127}))?(?:@(?P<digest>[a-z][a-z0-9]*(?:[+.-][a-z][a-z0-9]*)*:[a-zA-Z0-9*+,-./:;=@_]{32,}))?$`
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(image)
subexpNames := re.SubexpNames()
if matches == nil {
panic(fmt.Errorf("Invalid image reference: %s\n", image))
}
result := make(map[string]string)
for i, name := range subexpNames {
if i != 0 && name != "" && i < len(matches) {
result[name] = matches[i]
}
}
klog.V(3).Infof("Image: %s\n", image)
klog.V(3).Infof(" Registry: %s\n", result["registry"])
klog.V(3).Infof(" Repository: %s\n", result["repository"])
klog.V(3).Infof(" Tag: %s\n", result["tag"])
klog.V(3).Infof(" Digest: %s\n", result["digest"])
registry := result["registry"]
repository := result["repository"]
tag := result["tag"]
digest := result["digest"]
// Check if image has a tag
if digest == "" && tag == "" {
tag = "latest"
}
// Check if image has a host
if registry == "" {
registry = "docker.io"
}
// Handle the case when remainder doesn't specify library but it's not a docker.io official image
if registry == "docker.io" && !strings.Contains(repository, "/") {
repository = "library/" + repository
}
fullimage := registry + "/" + repository
if tag != "" {
fullimage += ":" + tag
}
if digest != "" {
fullimage += "@" + digest
}
return fullimage
}
func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod) bool {
ready := false
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.ReadyDuration {
ready = true
}
}
}
if pod.DeletionTimestamp != nil {
ready = false
}
return ready
}
func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[string]bool {
pods, err := clientset.CoreV1().Pods(fetcher.KubernetesNamespace).List(context.Background(),
metav1.ListOptions{})
if err != nil {
panic(err)
}
containers := make(map[string]bool)
containersOnCurrentNode := make(map[string]bool)
for _, pod := range pods.Items {
klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name)
for _, container := range pod.Spec.InitContainers {
klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == fetcher.Nodename {
containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true
} else {
if fetcher.isReadyForSomeTime(&pod) {
containers[fetcher.canonicalizeImageName(container.Image)] = true
}
}
}
for _, container := range pod.Spec.Containers {
klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == fetcher.Nodename {
containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true
} else {
if fetcher.isReadyForSomeTime(&pod) {
containers[fetcher.canonicalizeImageName(container.Image)] = true
}
}
}
}
for container, _ := range containersOnCurrentNode {
delete(containers, container)
}
return containers
}
func (fetcher *Fetcher) pullAndPin() error {
// Create the image manager
containerd, err := NewContainerd(fetcher.SocketPath, fetcher.ContainerdNamespace)
if err != nil {
klog.Fatalf("Failed to create image manager: %v", err)
}
imgs, err := containerd.List()
if err != nil {
return err
}
clientset, _ := GetKubernetesConnection()
containers := fetcher.getContainers(clientset)
for container, _ := range containers {
klog.V(3).Infof("Found container %s\n", container)
}
// unpin images that are not used
for container, pinned := range imgs {
if !containers[container] && pinned {
klog.Infof("Unpinning %s\n", container)
err := containerd.Unpin(container)
if err != nil {
klog.Warningf(" error: %v", err)
}
}
}
// Pull images that are used
for container := range containers {
if _, found := imgs[container]; !found {
klog.Infof("Pulling %s\n", container)
err := containerd.Pull(container)
if err != nil {
klog.Warningf("error: %v", err)
}
}
}
imgs, err = containerd.List()
if err != nil {
return err
}
// Pin images that are used and present
for container := range containers {
if pinned, found := imgs[container]; found && !pinned {
klog.Infof("Pinning %s\n", container)
err := containerd.Pin(container)
if err != nil {
klog.Warningf(" error: %v", err)
}
}
}
return nil
}
// TODO
// 1. periodic pull and pin, configurable time interval
// 2. logging summary results of each pull and pin
// 3. docker container with multi-stage build
// 4. kustomize deployment in wamb lee-operators namespace (kubernetes-setup repo)