kube-fetcher/cmd/fetcher/containerd.go
2025-03-01 13:03:18 +01:00

184 lines
4.6 KiB
Go

package main
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
)
// Containerd implements ContainerRuntime interface
type Containerd struct {
client *containerd.Client
ctx context.Context
namespace string
}
// NewImageManager creates a new Containerd
func NewImageManager(socketPath, namespace string) (*Containerd, error) {
client, err := containerd.New(socketPath)
if err != nil {
return nil, fmt.Errorf("failed to connect to containerd at %s: %w", socketPath, err)
}
client.LeasesService()
ctx := namespaces.WithNamespace(context.Background(), namespace)
return &Containerd{
client: client,
ctx: ctx,
namespace: namespace,
}, nil
}
// List returns all images with their pinned status
func (m *Containerd) List() ([]Image, error) {
// Get all images
images, err := m.client.ImageService().List(m.ctx)
if err != nil {
return nil, fmt.Errorf("failed to list images: %w", err)
}
// Get all leases
leases, err := m.client.LeasesService().List(m.ctx)
if err != nil {
return nil, fmt.Errorf("failed to list leases: %w", err)
}
// Create a map of image references that are pinned
pinnedImages := make(map[string]bool)
for _, lease := range leases {
// Check if lease has labels referencing an image
if label, ok := lease.Labels["containerd.io/gc.ref.content.image"]; ok {
pinnedImages[label] = true
}
}
// Create the result list
var result []Image
for _, img := range images {
result = append(result, Image{
Name: img.Name,
Pinned: pinnedImages[img.Name],
})
}
return result, nil
}
// Pin creates a lease for an image to prevent garbage collection
func (m *Containerd) Pin(imageRef string) error {
// Create a unique lease ID based on image reference
leaseID := fmt.Sprintf("pin-%s-%d", generateID(), time.Now().UnixNano())
// Get the image to validate it exists
_, err := m.client.ImageService().Get(m.ctx, imageRef)
if err != nil {
return fmt.Errorf("failed to get image %s: %w", imageRef, err)
}
// Create a new lease
opts := []leases.Opt{
leases.WithID(leaseID),
leases.WithLabels(map[string]string{
"containerd.io/gc.ref.content.image": imageRef,
}),
}
_, err = m.client.LeasesService().Create(m.ctx, opts...)
if err != nil {
return fmt.Errorf("failed to create lease: %w", err)
}
return nil
}
// Unpin removes a lease for an image allowing garbage collection
func (m *Containerd) Unpin(imageRef string) error {
// List all leases
leaseList, err := m.client.LeasesService().List(m.ctx)
if err != nil {
return fmt.Errorf("failed to list leases: %w", err)
}
// Find leases that reference our image
var foundLease bool
for _, lease := range leaseList {
// Check if this lease has a label referencing our image
if label, ok := lease.Labels["containerd.io/gc.ref.content.image"]; ok && label == imageRef {
// Delete this lease
if err := m.client.LeasesService().Delete(m.ctx, lease); err != nil {
return fmt.Errorf("failed to delete lease %s: %w", lease.ID, err)
}
foundLease = true
}
}
if !foundLease {
return fmt.Errorf("no lease found for image %s", imageRef)
}
return nil
}
// Pull pulls an image from a registry
func (m *Containerd) Pull(imageRef string) error {
// Set up pull options
pullOpts := []containerd.RemoteOpt{
containerd.WithPlatform(""),
containerd.WithPullUnpack,
}
// Pull the image
_, err := m.client.Pull(m.ctx, imageRef, pullOpts...)
if err != nil {
return fmt.Errorf("failed to pull image %s: %w", imageRef, err)
}
return nil
}
// Remove deletes an image
func (m *Containerd) Remove(imageRef string) error {
// Get the image
_, err := m.client.ImageService().Get(m.ctx, imageRef)
if err != nil {
if errdefs.IsNotFound(err) {
return fmt.Errorf("image %s not found", imageRef)
}
return fmt.Errorf("failed to get image %s: %w", imageRef, err)
}
// Delete the image
err = m.client.ImageService().Delete(m.ctx, imageRef, images.SynchronousDelete())
if err != nil {
return fmt.Errorf("failed to delete image %s: %w", imageRef, err)
}
return nil
}
// Close closes the containerd client connection
func (m *Containerd) Close() error {
return m.client.Close()
}
// generateID creates a random unique ID
func generateID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
// Fall back to a timestamp-based ID in the unlikely event of rand failure
return strings.Replace(fmt.Sprintf("%d", time.Now().UnixNano()), "-", "", -1)
}
return hex.EncodeToString(b)
}