kube-fetcher/cmd/fetcher/watcher.go

118 lines
3.1 KiB
Go

package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"time"
)
type Watcher struct {
monitoringWindow time.Duration
// Pods that were active in the given window. This list is pruned
// for entries that have not been running for some time.
// Map of Pod.metadata.uid to pod
pods map[types.UID]*corev1.Pod
serializer chan<- func()
}
func NewWatcher(clientset *kubernetes.Clientset, monitoringWindow time.Duration, namespace string, serializer chan<- func()) *Watcher {
watcher := &Watcher{
monitoringWindow: monitoringWindow,
pods: make(map[types.UID]*corev1.Pod),
serializer: serializer,
}
watcher.watchPods(clientset, namespace)
return watcher
}
func (watcher *Watcher) getPods() []*corev1.Pod {
pods := make(map[types.UID]*corev1.Pod)
klog.V(3).Infof("PODS %v", pods)
res := make([]*corev1.Pod, 0)
for uid, pod := range watcher.pods {
klog.V(3).Infof("Checking pod %s/%s\n", pod.Namespace, pod.Name)
if pod.DeletionTimestamp == nil ||
pod.DeletionTimestamp.IsZero() ||
time.Now().Sub(pod.DeletionTimestamp.Time) <= watcher.monitoringWindow {
klog.V(3).Infof("Pod found in moving windows %s/%s\n",
pod.Namespace, pod.Name)
pods[uid] = pod
res = append(res, pod)
}
}
watcher.pods = pods
return res
}
func (watcher *Watcher) watchPods(
clientset *kubernetes.Clientset,
namespace string) {
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"pods",
namespace,
fields.Everything(),
)
addOrUpdate := func(obj interface{}) {
watcher.serializer <- func() {
pod := watcher.getPod(obj)
// only add the pod if it all its containers were ready
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
return
}
}
klog.V(3).Infof("Added/updated %s/%s\n", pod.Namespace, pod.Name)
watcher.pods[pod.UID] = pod
}
}
options := cache.InformerOptions{
ListerWatcher: watchlist,
ObjectType: &corev1.Pod{},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: addOrUpdate,
UpdateFunc: func(_ any, obj any) {
addOrUpdate(obj)
},
DeleteFunc: func(obj any) {
watcher.serializer <- func() {
pod := watcher.getPod(obj)
klog.V(3).Infof("Delete %s/%s\n", pod.Namespace, pod.Name)
now := metav1.NewTime(time.Now())
watcher.pods[pod.UID] = pod
pod.DeletionTimestamp = &now
}
},
},
ResyncPeriod: 0,
}
_, controller := cache.NewInformerWithOptions(options)
stop := make(chan struct{})
go func() {
controller.Run(stop)
panic(fmt.Errorf("Watching for pod changes stopped"))
}()
// Wait for the cache to sync
if !cache.WaitForCacheSync(stop, controller.HasSynced) {
panic(fmt.Errorf("failed to sync cache"))
}
}
func (watcher *Watcher) getPod(obj any) *corev1.Pod {
k8spod, ok := obj.(*corev1.Pod)
if !ok {
klog.Fatalf("Object of wrong type: %v", obj)
}
return k8spod
}