diff --git a/Dockerfile b/Dockerfile index 39eade5..8769be2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,8 @@ ENV GOTOOLCHAIN=auto COPY go.mod go.sum /opt/fetcher/ RUN go mod download -COPY cmd /opt/fetcher/cmd/ +COPY cmd /opt/fetcher/cmd/ +COPY pkg /opt/fetcher/pkg/ RUN go build -o bin ./cmd/... RUN find . -type f diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..0b2cf38 --- /dev/null +++ b/TODO.md @@ -0,0 +1,52 @@ + + +labels on image manifest + +# Getting the image digest +root@baboon:~# ctr -n k8s.io image ls | grep cat.wamblee.org/converge:1.0.0 +cat.wamblee.org/converge:1.0.0 +application/vnd.docker.distribution.manifest.v2+json sha256:c08c336462955fcf4a63357dd95b1c5a37f1fbf2ca96e476d38968c39e782b13 39.2 MiB linux/amd64 + +# getting the manifest + +# ctr -n k8s.io content get sha256:c08c336462955fcf4a63357dd95b1c5a37f1fbf2ca96e476d38968c39e782b13 +{ + "schemaVersion": 2, + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "config": { + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": 1573, + "digest": "sha256:f73adff49e6fb8d94c55cd4030e5045e854de284418e737493fc22727f6b4e44" + }, + "layers": [ + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 7748860, + "digest": "sha256:be3c1309dc5ae93749438ca02ae64fae75cc04bf01507672f7ee0fe5dcc06c74" + }, + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 32034281, + "digest": "sha256:72c349c372c463b95dca722c644359550949b2824852f02f1cf12b8b4b03bb0e" + }, + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 1304341, + "digest": "sha256:bf7da15860525c2cba9a6639846b5ce958c373cdae44ca6f704af406703e52d5" + } + ] +} + +# blob label? +containerd.io/gc.ref.snapshot.overlayfs=sha256:a7800a8e9dc0e1b7141ee372c7a0bad3d34cd5d5b061a9cc41d928e331578e0e + + +# ctr -n k8s.io content ls | grep sha256:a7800a8e9dc0e1b7141ee372c7a0bad3d34cd5d5b061a9cc41d928e331578e0e +sha256:f73adff49e6fb8d94c55cd4030e5045e854de284418e737493fc22727f6b4e44 1.573kB 5 days containerd.io/gc.ref.snapshot.overlayfs=sha256:a7800a8e9dc0e1b7141ee372c7a0bad3d34cd5d5b061a9cc41d928e331578e0e,containerd.io/distribution.source.cat.wamblee.org=converge + + +# ctr -n k8s.io content ls | grep f73adff49e6fb8d94c55cd4030e5045e854de284418e737493fc22727f6b4e44 +sha256:c08c336462955fcf4a63357dd95b1c5a37f1fbf2ca96e476d38968c39e782b13 951B 5 days containerd.io/gc.ref.content.l.2=sha256:bf7da15860525c2cba9a6639846b5ce958c373cdae44ca6f704af406703e52d5,containerd.io/gc.ref.content.l.1=sha256:72c349c372c463b95dca722c644359550949b2824852f02f1cf12b8b4b03bb0e,containerd.io/gc.ref.content.l.0=sha256:be3c1309dc5ae93749438ca02ae64fae75cc04bf01507672f7ee0fe5dcc06c74,containerd.io/gc.ref.content.config=sha256:f73adff49e6fb8d94c55cd4030e5045e854de284418e737493fc22727f6b4e44,containerd.io/distribution.source.cat.wamblee.org=converge +sha256:f73adff49e6fb8d94c55cd4030e5045e854de284418e737493fc22727f6b4e44 1.573kB 5 days containerd.io/gc.ref.snapshot.overlayfs=sha256:a7800a8e9dc0e1b7141ee372c7a0bad3d34cd5d5b061a9cc41d928e331578e0e,containerd.io/distribution.source.cat.wamblee.org=converge + +https://www.mo4tech.com/revelation-containerd-image-file-loss-problem-is-caused-by-image-generation.html \ No newline at end of file diff --git a/cmd/ctrutil/config.go b/cmd/ctrutil/config.go new file mode 100644 index 0000000..36033fb --- /dev/null +++ b/cmd/ctrutil/config.go @@ -0,0 +1,6 @@ +package main + +type Config struct { + SocketPath string + ContainerdNamespace string +} diff --git a/cmd/ctrutil/main.go b/cmd/ctrutil/main.go new file mode 100644 index 0000000..0baf3d0 --- /dev/null +++ b/cmd/ctrutil/main.go @@ -0,0 +1,120 @@ +package main + +import ( + goflags "flag" + "fmt" + "git.wamblee.org/public/kube-fetcher/pkg/ctrd" + "github.com/spf13/cobra" + "io" + "k8s.io/klog/v2" + "os" +) + +func main() { + klogFlags := goflags.NewFlagSet("", goflags.PanicOnError) + klog.InitFlags(klogFlags) + + config := &Config{} + + var runtime *ctrd.Containerd + var err error + + cmd := &cobra.Command{ + Use: "ctrutil", + Short: "Containerd utility", + Long: ` +Containerd for working with images as they are pulled by the kubelet.`, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + runtime, err = ctrd.NewContainerd(config.SocketPath, + config.ContainerdNamespace) + if err != nil { + return err + } + return nil + }, + } + + pull := &cobra.Command{ + Use: "pull", + Short: "pull image", + Long: ` +Pull image`, + RunE: func(cmd *cobra.Command, args []string) error { + for _, image := range args { + klog.Infof("Pulling '%s'", image) + err := runtime.Pull(image) + if err != nil { + return fmt.Errorf("Cannot pull '%s': %v", image, err) + } + } + return nil + }, + } + cmd.AddCommand(pull) + + blob := &cobra.Command{ + Use: "blob", + Short: "blob manipulation", + Long: ` +Containerd for working with images as they are pulled by the kubelet.`, + } + cmd.AddCommand(blob) + + blobget := &cobra.Command{ + Use: "get", + Short: "get blob data", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return fmt.Errorf("Expected blob digest as argument") + } + reader, err := runtime.GetBlob(args[0]) + if err != nil { + return err + } + _, err = io.Copy(os.Stdout, reader) + return err + }, + } + blob.AddCommand(blobget) + + blobmeta := &cobra.Command{ + Use: "meta", + Short: "get blob metadata", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return fmt.Errorf("Expected blob digest as argument") + } + info, err := runtime.GetBlobMeta(args[0]) + if err != nil { + return err + } + fmt.Printf("size: %v\n", info.Size) + fmt.Printf("digest: %v\n", info.Digest) + fmt.Printf("created: %v\n", info.CreatedAt) + fmt.Printf("updated: %v\n", info.UpdatedAt) + for k, v := range info.Labels { + fmt.Printf("label: %s=%s\n", k, v) + } + + return nil + }, + } + blob.AddCommand(blobmeta) + + if 1 > 2 { + runtime.List() + } + + cmd.PersistentFlags().StringVar(&config.SocketPath, "socket", + "/run/runtime/runtime.sock", "Containerd socket") + cmd.PersistentFlags().StringVar(&config.ContainerdNamespace, "runtime-namespace", + "k8s.io", "Containerd namespace to use") + cmd.Flags().AddGoFlagSet(klogFlags) + + err = cmd.Execute() + if err != nil { + klog.Errorf("Error: %v", err) + os.Exit(1) + } + +} diff --git a/cmd/fetcher/config.go b/cmd/fetcher/config.go index 55416f8..94878ba 100644 --- a/cmd/fetcher/config.go +++ b/cmd/fetcher/config.go @@ -3,6 +3,7 @@ package main import "time" type Config struct { + InitialPullAll bool PollInterval time.Duration KubernetesNamespace string SocketPath string @@ -11,4 +12,5 @@ type Config struct { ReadyDuration time.Duration includeControllerNodes bool monitoringWindowSize time.Duration + mirrors map[string]string } diff --git a/cmd/fetcher/fetcher.go b/cmd/fetcher/fetcher.go index 154fa64..fc91bc3 100644 --- a/cmd/fetcher/fetcher.go +++ b/cmd/fetcher/fetcher.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + containerd2 "git.wamblee.org/public/kube-fetcher/pkg/ctrd" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -108,7 +109,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin return containers } -func (fetcher *Fetcher) pullAndPin() error { +func (fetcher *Fetcher) pullAndPin(pullAll bool) error { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { @@ -116,7 +117,7 @@ func (fetcher *Fetcher) pullAndPin() error { } // Create the image manager - containerd, err := NewContainerd(fetcher.config.SocketPath, fetcher.config.ContainerdNamespace) + containerd, err := containerd2.NewContainerd(fetcher.config.SocketPath, fetcher.config.ContainerdNamespace) if err != nil { klog.Fatalf("Failed to create image manager: %v", err) } @@ -144,12 +145,26 @@ func (fetcher *Fetcher) pullAndPin() error { // Pull images that are used for container := range containers { - if _, found := imgs[container]; !found { + if _, found := imgs[container]; !found || pullAll { + tag := "" + for registry, mirror := range fetcher.config.mirrors { + if strings.HasPrefix(container, registry+"/") { + tag = container + container = mirror + container[len(registry):] + } + } klog.Infof("%s: Pulling %s\n", nodeName, container) err := containerd.Pull(container) if err != nil { klog.Warningf("error: %v", err) } + if tag != "" { + klog.Infof("%s: Tagging '%s' -> '%s'", nodeName, container, tag) + err := containerd.Tag(container, tag) + if err != nil { + klog.Warningf("Could not tag '%s' -> '%s'", container, tag) + } + } } } diff --git a/cmd/fetcher/main.go b/cmd/fetcher/main.go index cbfa39e..9997ec4 100644 --- a/cmd/fetcher/main.go +++ b/cmd/fetcher/main.go @@ -2,6 +2,7 @@ package main import ( goflags "flag" + "git.wamblee.org/public/kube-fetcher/pkg/support" "github.com/spf13/cobra" "k8s.io/klog/v2" "os" @@ -12,7 +13,7 @@ func main() { klogFlags := goflags.NewFlagSet("", goflags.PanicOnError) klog.InitFlags(klogFlags) - clientset := GetKubernetesConnection() + clientset := support.GetKubernetesConnection() config := &Config{} cmd := &cobra.Command{ @@ -23,6 +24,7 @@ Queries k8s for all running pods and makes sure that all images referenced in pods are made available on the local k8s node and pinned so they don't get garbage collected'`, RunE: func(cmd *cobra.Command, args []string) error { + serializer := make(chan func()) go func() { for action := range serializer { @@ -32,13 +34,15 @@ so they don't get garbage collected'`, watcher := NewWatcher(clientset, config.monitoringWindowSize, config.KubernetesNamespace, serializer) fetcher := NewFetcher(clientset, config, watcher) + // TODO config option + fetcher.pullAndPin(config.InitialPullAll) ticker := time.NewTicker(config.PollInterval) for { select { case <-ticker.C: serializer <- func() { klog.V(3).Infof("Fetcher.pullAndPin") - fetcher.pullAndPin() + fetcher.pullAndPin(false) } } } @@ -61,6 +65,10 @@ so they don't get garbage collected'`, 6*time.Hour, "Monitoring window to see what pods were active") cmd.PersistentFlags().DurationVar(&config.PollInterval, "poll-interval", 1*time.Minute, "Poll interval for checking whether to pull images. ") + cmd.PersistentFlags().StringToStringVar(&config.mirrors, + "mirror", make(map[string]string), "Specify regsitry mirror in the form registrey=mirror, e.g. docker.io=my.mirror. The option can be repeated.") + cmd.PersistentFlags().BoolVar(&config.InitialPullAll, "initial-pull-all", + false, "Initially pull all images, this can be usefule for populating a caching proxy.") cmd.Flags().AddGoFlagSet(klogFlags) err := cmd.Execute() diff --git a/go.mod b/go.mod index 1b253e7..e837c16 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,10 @@ go 1.24.0 require ( github.com/containerd/containerd v1.7.26 + github.com/opencontainers/go-digest v1.0.0 + github.com/opencontainers/image-spec v1.1.0 github.com/spf13/cobra v1.9.1 + k8s.io/api v0.32.2 k8s.io/apimachinery v0.32.2 k8s.io/client-go v0.32.2 k8s.io/klog/v2 v2.130.1 @@ -57,8 +60,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/opencontainers/go-digest v1.0.0 // indirect - github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runtime-spec v1.2.1 // indirect github.com/opencontainers/selinux v1.11.1 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -85,7 +86,6 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.32.2 // indirect k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 // indirect k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect diff --git a/helm/templates/daemonset.yaml b/helm/templates/daemonset.yaml index 9f0164a..878a6f9 100644 --- a/helm/templates/daemonset.yaml +++ b/helm/templates/daemonset.yaml @@ -27,8 +27,15 @@ spec: privileged: true runAsUser: 0 args: - - --ready-duration=1m - - --v=3 + - --ready-duration={{ .Values.readyDuration }} + - --v={{ .Values.logLevel }} + {{- if .Values.initialPullAll }} + - --initial-pull-all + {{- end }} + {{- range $mirror := .Values.mirrors }} + - --mirror + - {{ $mirror.registry }}={{ $mirror.mirror }} + {{- end }} volumeMounts: - mountPath: /run/containerd/containerd.sock name: containerd-sock @@ -36,4 +43,4 @@ spec: - name: containerd-sock hostPath: path: /run/containerd/containerd.sock - type: Socket \ No newline at end of file + type: Socket diff --git a/helm/values.yaml b/helm/values.yaml index 8b13789..a88e0fc 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1 +1,19 @@ + +logLevel: 2 +readyDuration: 1m +initialPullAll: true + +mirrors: + - registry: docker.io + mirror: wharf.wamblee.org + - registry: gcr.io + mirror: wharf.wamblee.org + - registry: k8s.gcr.io + mirror: wharf.wamblee.org + - registry: quay.io + mirror: wharf.wamblee.org + - registry: ghcr.io + mirror: wharf.wamblee.org + - registry: registry.k8s.io + mirror: wharf.wamblee.org \ No newline at end of file diff --git a/cmd/fetcher/containerd.go b/pkg/ctrd/containerd.go similarity index 79% rename from cmd/fetcher/containerd.go rename to pkg/ctrd/containerd.go index b8299ee..aa53500 100644 --- a/cmd/fetcher/containerd.go +++ b/pkg/ctrd/containerd.go @@ -1,4 +1,4 @@ -package main +package ctrd import ( "context" @@ -6,10 +6,14 @@ import ( "encoding/base64" "fmt" "github.com/containerd/containerd" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "io" "k8s.io/klog/v2" ) @@ -158,6 +162,28 @@ func (m *Containerd) Pull(imageRef string) error { return nil } +func (m *Containerd) Tag(imageFrom, imageTo string) error { + sourceImage, err := m.client.GetImage(m.ctx, imageFrom) + if err != nil { + return fmt.Errorf("Failed to get source image: %v", err) + } + + // Create a new image with the target name that references the same content + newImage := images.Image{ + Name: imageTo, + Target: sourceImage.Target(), + } + + // best effort, remove old tag + _ = m.Remove(imageTo) + + // Create the new tag + if _, err := m.client.ImageService().Create(m.ctx, newImage); err != nil { + return fmt.Errorf("Failed to create new image tag: %v", err) + } + return nil +} + // Remove deletes an image func (m *Containerd) Remove(imageRef string) error { // Get the image @@ -191,3 +217,24 @@ func generateID(image string) string { md5Base64 := base64.StdEncoding.EncodeToString(md5Hash[:]) return md5Base64 } + +func (m *Containerd) GetBlob(hash string) (io.Reader, error) { + store := m.client.ContentStore() + descriptor := ocispec.Descriptor{ + Digest: digest.Digest(hash), + } + reader, err := store.ReaderAt(m.ctx, descriptor) + if err != nil { + return nil, err + } + return content.NewReader(reader), nil +} + +func (m *Containerd) GetBlobMeta(hash string) (content.Info, error) { + store := m.client.ContentStore() + info, err := store.Info(m.ctx, digest.Digest(hash)) + if err != nil { + return content.Info{}, err + } + return info, nil +} diff --git a/cmd/fetcher/container_runtime.go b/pkg/runtime/container_runtime.go similarity index 86% rename from cmd/fetcher/container_runtime.go rename to pkg/runtime/container_runtime.go index cdfcb84..bbda4d0 100644 --- a/cmd/fetcher/container_runtime.go +++ b/pkg/runtime/container_runtime.go @@ -1,4 +1,4 @@ -package main +package runtime // ContainerRuntime defines the interface for managing containerd images type ContainerRuntime interface { @@ -7,5 +7,6 @@ type ContainerRuntime interface { Pin(imageRef string) error Unpin(imageRef string) error Pull(imageRef string) error + Tag(imageFrom, imageTo string) error Remove(imageRef string) error } diff --git a/cmd/fetcher/kubernetes.go b/pkg/support/kubernetes.go similarity index 97% rename from cmd/fetcher/kubernetes.go rename to pkg/support/kubernetes.go index 3877d5a..5149aa5 100644 --- a/cmd/fetcher/kubernetes.go +++ b/pkg/support/kubernetes.go @@ -1,4 +1,4 @@ -package main +package support import ( "k8s.io/client-go/kubernetes"