kube-fetcher/cmd/fetcher/fetcher.go
Erik Brakkee be5ceb1ee5 logging node name for important operations
terminating application when watching pods stops
Dockerfile and docker compose file added.
helm chart added
2025-03-02 23:33:17 +01:00

194 lines
4.9 KiB
Go

package main
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"os"
"strings"
"time"
)
type Fetcher struct {
clientset *kubernetes.Clientset
config *Config
watcher *Watcher
}
func NewFetcher(clientset *kubernetes.Clientset, config *Config, watcher *Watcher) *Fetcher {
return &Fetcher{
config: config,
clientset: clientset,
watcher: watcher,
}
}
func (fetcher *Fetcher) canonicalizeImageName(image string) string {
parts := strings.Split(image, "/")
if len(parts) < 1 {
panic(fmt.Errorf("Could not disect image name '%s'", image))
}
if len(parts) == 1 {
parts = []string{"docker.io", "library", parts[0]}
}
if !strings.Contains(parts[len(parts)-1], ":") {
parts[len(parts)-1] = parts[len(parts)-1] + ":latest"
}
registry := ""
if strings.Contains(parts[0], ".") {
registry = parts[0]
parts = parts[1:]
} else {
registry = "docker.io"
}
if registry == "docker.io" && len(parts) == 1 {
parts[0] = "library/" + parts[0]
}
return registry + "/" + strings.Join(parts, "/")
}
func (fetcher *Fetcher) wasReady(pod *v1.Pod, controllers map[string]bool) bool {
if !fetcher.config.includeControllerNodes {
klog.V(3).Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName)
if _, ok := controllers[pod.Spec.NodeName]; ok {
return false
}
}
return time.Now().Sub(pod.CreationTimestamp.Time) >= fetcher.config.ReadyDuration
}
func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[string]bool {
controllers := fetcher.getControllerNames(clientset)
pods := fetcher.watcher.getPods()
//pods, err := clientset.CoreV1().Pods(fetcher.config.KubernetesNamespace).List(context.Background(),
// metav1.ListOptions{})
//if err != nil {
// panic(err)
//}
containers := make(map[string]bool)
containersOnCurrentNode := make(map[string]bool)
for _, pod := range pods {
klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name)
for _, container := range pod.Spec.InitContainers {
klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == fetcher.config.Nodename {
containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true
} else {
if fetcher.wasReady(pod, controllers) {
containers[fetcher.canonicalizeImageName(container.Image)] = true
}
}
}
for _, container := range pod.Spec.Containers {
klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == fetcher.config.Nodename {
containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true
} else {
if fetcher.wasReady(pod, controllers) {
containers[fetcher.canonicalizeImageName(container.Image)] = true
}
}
}
}
for container := range containersOnCurrentNode {
delete(containers, container)
}
return containers
}
func (fetcher *Fetcher) pullAndPin() error {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
nodeName = "UNKNOWN"
}
// Create the image manager
containerd, err := NewContainerd(fetcher.config.SocketPath, fetcher.config.ContainerdNamespace)
if err != nil {
klog.Fatalf("Failed to create image manager: %v", err)
}
imgs, err := containerd.List()
if err != nil {
return err
}
containers := fetcher.getContainers(fetcher.clientset)
for container := range containers {
klog.V(3).Infof("Found container %s\n", container)
}
// unpin images that are not used
for container, pinned := range imgs {
if !containers[container] && pinned {
klog.Infof("%s: Unpinning %s\n", nodeName, container)
err := containerd.Unpin(container)
if err != nil {
klog.Warningf(" error: %v", err)
}
}
}
// Pull images that are used
for container := range containers {
if _, found := imgs[container]; !found {
klog.Infof("%s: Pulling %s\n", nodeName, container)
err := containerd.Pull(container)
if err != nil {
klog.Warningf("error: %v", err)
}
}
}
imgs, err = containerd.List()
if err != nil {
return err
}
// Pin images that are used and present
for container := range containers {
if pinned, found := imgs[container]; found && !pinned {
klog.Infof("%s: Pinning %s\n", nodeName, container)
err := containerd.Pin(container)
if err != nil {
klog.Warningf(" error: %v", err)
}
}
}
return nil
}
func (fetcher *Fetcher) getControllerNames(clientset *kubernetes.Clientset) map[string]bool {
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: "node-role.kubernetes.io/control-plane=,!node-role.kubernetes.io/master",
})
if err != nil {
fmt.Printf("Error listing nodes: %v\n", err)
os.Exit(1)
}
nodeset := make(map[string]bool)
for _, node := range nodes.Items {
nodeset[node.Name] = true
}
return nodeset
}
// TODO
// 1. periodic pull and pin, configurable time interval
// 2. logging summary results of each pull and pin
// 3. docker container with multi-stage build
// 4. kustomize deployment in wamb lee-operators namespace (kubernetes-setup repo)