towards a structure with more dependency injection and uisng a moving window to see what pods were active in the given interval.

This commit is contained in:
Erik Brakkee 2025-03-02 17:41:25 +01:00
parent 891fdc990c
commit 2e537818fb
5 changed files with 103 additions and 30 deletions

12
cmd/fetcher/config.go Normal file
View File

@ -0,0 +1,12 @@
package main
import "time"
type Config struct {
KubernetesNamespace string
SocketPath string
ContainerdNamespace string
Nodename string
ReadyDuration time.Duration
includeControllerNodes bool
}

View File

@ -15,12 +15,15 @@ import (
) )
type Fetcher struct { type Fetcher struct {
KubernetesNamespace string config *Config
SocketPath string clientset *kubernetes.Clientset
ContainerdNamespace string }
Nodename string
ReadyDuration time.Duration func NewFetcher(config *Config, clientset *kubernetes.Clientset) *Fetcher {
includeControllerNodes bool return &Fetcher{
config: config,
clientset: clientset,
}
} }
func (fetcher *Fetcher) canonicalizeImageName(image string) string { func (fetcher *Fetcher) canonicalizeImageName(image string) string {
@ -76,7 +79,7 @@ func (fetcher *Fetcher) canonicalizeImageName(image string) string {
func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]bool) bool { func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]bool) bool {
ready := false ready := false
if !fetcher.includeControllerNodes { if !fetcher.config.includeControllerNodes {
klog.Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName) klog.Infof("Checking %s (%s)", pod.Name, pod.Spec.NodeName)
if _, ok := controllers[pod.Spec.NodeName]; ok { if _, ok := controllers[pod.Spec.NodeName]; ok {
return false return false
@ -84,7 +87,7 @@ func (fetcher *Fetcher) isReadyForSomeTime(pod *v1.Pod, controllers map[string]b
} }
for _, condition := range pod.Status.Conditions { for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.ReadyDuration { if time.Now().Sub(condition.LastTransitionTime.Time) > fetcher.config.ReadyDuration {
ready = true ready = true
} }
} }
@ -99,7 +102,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin
controllers := fetcher.getControllerNames(clientset) controllers := fetcher.getControllerNames(clientset)
pods, err := clientset.CoreV1().Pods(fetcher.KubernetesNamespace).List(context.Background(), pods, err := clientset.CoreV1().Pods(fetcher.config.KubernetesNamespace).List(context.Background(),
metav1.ListOptions{}) metav1.ListOptions{})
if err != nil { if err != nil {
panic(err) panic(err)
@ -112,7 +115,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin
klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name) klog.V(3).Infof("%s/%s\n", pod.Namespace, pod.Name)
for _, container := range pod.Spec.InitContainers { for _, container := range pod.Spec.InitContainers {
klog.V(3).Infof(" %s\n", container.Image) klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == fetcher.Nodename { if pod.Spec.NodeName == fetcher.config.Nodename {
containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true
} else { } else {
if fetcher.isReadyForSomeTime(&pod, controllers) { if fetcher.isReadyForSomeTime(&pod, controllers) {
@ -122,7 +125,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin
} }
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
klog.V(3).Infof(" %s\n", container.Image) klog.V(3).Infof(" %s\n", container.Image)
if pod.Spec.NodeName == fetcher.Nodename { if pod.Spec.NodeName == fetcher.config.Nodename {
containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true containersOnCurrentNode[fetcher.canonicalizeImageName(container.Image)] = true
} else { } else {
if fetcher.isReadyForSomeTime(&pod, controllers) { if fetcher.isReadyForSomeTime(&pod, controllers) {
@ -132,7 +135,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin
} }
} }
for container, _ := range containersOnCurrentNode { for container := range containersOnCurrentNode {
delete(containers, container) delete(containers, container)
} }
return containers return containers
@ -141,7 +144,7 @@ func (fetcher *Fetcher) getContainers(clientset *kubernetes.Clientset) map[strin
func (fetcher *Fetcher) pullAndPin() error { func (fetcher *Fetcher) pullAndPin() error {
// Create the image manager // Create the image manager
containerd, err := NewContainerd(fetcher.SocketPath, fetcher.ContainerdNamespace) containerd, err := NewContainerd(fetcher.config.SocketPath, fetcher.config.ContainerdNamespace)
if err != nil { if err != nil {
klog.Fatalf("Failed to create image manager: %v", err) klog.Fatalf("Failed to create image manager: %v", err)
} }
@ -151,10 +154,8 @@ func (fetcher *Fetcher) pullAndPin() error {
return err return err
} }
clientset, _ := GetKubernetesConnection() containers := fetcher.getContainers(fetcher.clientset)
for container := range containers {
containers := fetcher.getContainers(clientset)
for container, _ := range containers {
klog.V(3).Infof("Found container %s\n", container) klog.V(3).Infof("Found container %s\n", container)
} }

View File

@ -6,7 +6,7 @@ import (
"log" "log"
) )
func GetKubernetesConnection() (*kubernetes.Clientset, string) { func GetKubernetesConnection() *kubernetes.Clientset {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{} configOverrides := &clientcmd.ConfigOverrides{}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
@ -21,9 +21,5 @@ func GetKubernetesConnection() (*kubernetes.Clientset, string) {
log.Panicln(err.Error()) log.Panicln(err.Error())
} }
namespace, _, err := kubeConfig.Namespace() return clientset
if err != nil {
log.Panicf("Could not get namespace")
}
return clientset, namespace
} }

View File

@ -12,7 +12,9 @@ func main() {
klogFlags := goflags.NewFlagSet("", goflags.PanicOnError) klogFlags := goflags.NewFlagSet("", goflags.PanicOnError)
klog.InitFlags(klogFlags) klog.InitFlags(klogFlags)
fetcher := Fetcher{} clientset := GetKubernetesConnection()
config := &Config{}
fetcher := NewFetcher(config, clientset)
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "kube-fetcher", Use: "kube-fetcher",
@ -23,21 +25,24 @@ images referenced in pods are made available on the local k8s node and pinned
so they don't get garbage collected'`, so they don't get garbage collected'`,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
err := fetcher.pullAndPin() err := fetcher.pullAndPin()
//watcher := Watcher{}
//watcher.WatchPods(clientset, config.KubernetesNamespace)
return err return err
}, },
} }
cmd.PersistentFlags().StringVar(&fetcher.KubernetesNamespace, "kubernetes-namespace", cmd.PersistentFlags().StringVar(&config.KubernetesNamespace, "kubernetes-namespace",
"", "Kubernetes containerdNamespace to inspect (default is all namespaces)") "", "Kubernetes containerdNamespace to inspect (default is all namespaces)")
cmd.PersistentFlags().StringVar(&fetcher.SocketPath, "socket", cmd.PersistentFlags().StringVar(&config.SocketPath, "socket",
"/run/containerd/containerd.sock", "Containerd socket") "/run/containerd/containerd.sock", "Containerd socket")
cmd.PersistentFlags().StringVar(&fetcher.ContainerdNamespace, "containerd-namespace", cmd.PersistentFlags().StringVar(&config.ContainerdNamespace, "containerd-namespace",
"k8s.io", "Containerd namespace to use") "k8s.io", "Containerd namespace to use")
cmd.PersistentFlags().StringVar(&fetcher.Nodename, "nodename", "", cmd.PersistentFlags().StringVar(&config.Nodename, "nodename", "",
"Kubernetes node name the fetcher is running on, it will only fetch images running on other nodes") "Kubernetes node name the fetcher is running on, it will only fetch images running on other nodes")
cmd.PersistentFlags().DurationVar(&fetcher.ReadyDuration, "ready-duration", cmd.PersistentFlags().DurationVar(&config.ReadyDuration, "ready-duration",
1*time.Hour, "Time a pod must be ready before its image will be fetched") 1*time.Hour, "Time a pod must be ready before its image will be fetched")
cmd.PersistentFlags().BoolVar(&fetcher.includeControllerNodes, "include-controllers", cmd.PersistentFlags().BoolVar(&config.includeControllerNodes, "include-controllers",
false, "Include controller nodes") false, "Include controller nodes")
cmd.Flags().AddGoFlagSet(klogFlags) cmd.Flags().AddGoFlagSet(klogFlags)

59
cmd/fetcher/watcher.go Normal file
View File

@ -0,0 +1,59 @@
package main
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
type Watcher struct {
}
func (watcher *Watcher) WatchPods(
clientset *kubernetes.Clientset,
namespace string) {
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"pods",
namespace,
fields.Everything(),
)
addOrUpdate := func(obj interface{}) {
pod := watcher.getPod(obj)
klog.Infof("Added/updated %s/%s\n", pod.Namespace, pod.Name)
}
options := cache.InformerOptions{
ListerWatcher: watchlist,
ObjectType: &corev1.Pod{},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: addOrUpdate,
UpdateFunc: func(_ any, obj any) {
addOrUpdate(obj)
},
DeleteFunc: func(obj any) {
pod := watcher.getPod(obj)
klog.Infof("Delete %s/%s\n", pod.Namespace, pod.Name)
},
},
ResyncPeriod: 0,
}
_, controller := cache.NewInformerWithOptions(options)
stop := make(chan struct{})
defer close(stop)
go controller.Run(stop)
select {}
}
func (watcher *Watcher) getPod(obj any) *corev1.Pod {
k8spod, ok := obj.(*corev1.Pod)
if !ok {
klog.Fatalf("Object of wrong type: %v", obj)
}
return k8spod
}