kube-fetcher/cmd/fetcher/watcher.go
Erik Brakkee e8d4adaf53 periodically checking now for pods.
Now also dealing with deleted resources in the time interval.
Making sure that new pods are not immediately synced on other nodes to avoid pull rate limits caused by this code.
2025-03-02 19:47:44 +01:00

113 lines
2.9 KiB
Go

package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"time"
)
type Watcher struct {
monitoringWindow time.Duration
// Pods that were active in the given window. This list is pruned
// for entries that have not been running for some time.
// Map of Pod.metadata.uid to pod
pods map[types.UID]*corev1.Pod
serializer chan<- func()
}
func NewWatcher(clientset *kubernetes.Clientset, monitoringWindow time.Duration, namespace string, serializer chan<- func()) *Watcher {
watcher := &Watcher{
monitoringWindow: monitoringWindow,
pods: make(map[types.UID]*corev1.Pod),
serializer: serializer,
}
watcher.watchPods(clientset, namespace)
return watcher
}
func (watcher *Watcher) getPods() []*corev1.Pod {
pods := make(map[types.UID]*corev1.Pod)
klog.V(3).Infof("PODS %v", pods)
res := make([]*corev1.Pod, 0)
for uid, pod := range watcher.pods {
klog.V(3).Infof("Checking pod %s/%s\n", pod.Namespace, pod.Name)
if pod.DeletionTimestamp == nil ||
pod.DeletionTimestamp.IsZero() ||
time.Now().Sub(pod.DeletionTimestamp.Time) <= watcher.monitoringWindow {
klog.V(3).Infof("Pod found in moving windows %s/%s\n",
pod.Namespace, pod.Name)
pods[uid] = pod
res = append(res, pod)
}
}
watcher.pods = pods
return res
}
func (watcher *Watcher) watchPods(
clientset *kubernetes.Clientset,
namespace string) {
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"pods",
namespace,
fields.Everything(),
)
addOrUpdate := func(obj interface{}) {
watcher.serializer <- func() {
pod := watcher.getPod(obj)
// only add the pod if it all its containers were ready
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
return
}
}
klog.V(3).Infof("Added/updated %s/%s\n", pod.Namespace, pod.Name)
watcher.pods[pod.UID] = pod
}
}
options := cache.InformerOptions{
ListerWatcher: watchlist,
ObjectType: &corev1.Pod{},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: addOrUpdate,
UpdateFunc: func(_ any, obj any) {
addOrUpdate(obj)
},
DeleteFunc: func(obj any) {
watcher.serializer <- func() {
pod := watcher.getPod(obj)
klog.V(3).Infof("Delete %s/%s\n", pod.Namespace, pod.Name)
watcher.pods[pod.UID].DeletionTimestamp.Time = time.Now()
}
},
},
ResyncPeriod: 0,
}
_, controller := cache.NewInformerWithOptions(options)
stop := make(chan struct{})
defer close(stop)
go controller.Run(stop)
// Wait for the cache to sync
if !cache.WaitForCacheSync(stop, controller.HasSynced) {
panic(fmt.Errorf("failed to sync cache"))
}
}
func (watcher *Watcher) getPod(obj any) *corev1.Pod {
k8spod, ok := obj.(*corev1.Pod)
if !ok {
klog.Fatalf("Object of wrong type: %v", obj)
}
return k8spod
}