Reputation: 91
I'm trying to use client-go informers to get the replica count on deployments. Whenever autoscaling changes the number of replicas, I need to retrieve this in order to handle some other logic. I was previously using the Watch() function, but there are a few inconsistencies with timeouts and connection drops.
The following code below shows an example of the implementation:
labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()
// Using the channels and goroutines below didn't show changes:
stopper := make(chan struct{})
defer close(stopper)
//go func() {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj, ok := obj.(*appsv1.Deployment)
if !ok {
panic(spew.Sdump("informer returned invalid type", mObj))
}
replicas := int(*mObj.Spec.Replicas)
logger.Infof("updating replicas to %d", replicas)
sendUpdates() // use updates elsewhere
},
UpdateFunc: func(oldObj, newObj interface{}) {
old, ok := oldObj.(*appsv1.Deployment)
if !ok {
panic(spew.Sdump("informer returned invalid type", old))
}
newDeployment, ok := newObj.(*appsv1.Deployment)
if !ok {
panic(spew.Sdump("informer returned invalid type", newDeployment))
}
oldReplicas := int(*old.Spec.Replicas)
newReplicas := int(*newDeployment.Spec.Replicas)
if oldReplicas != newReplicas {
sendUpdates()
}
},
})
//factory.Start(wait.NeverStop)
//factory.WaitForCacheSync(wait.NeverStop)
informer.Run(stopper)
When Kubernetes autoscales or I change the Deployments replica manually, I get deployment.apps/app scaled
but it doesn't get caught by the Informer. Nothing gets printed in the logs and it enters a crash loop with no error message.
I used the following resources:
Upvotes: 4
Views: 4436
Reputation: 91
A few things to note:
informerFactory.Start()
, ensure that the Informer is called directly(informer := factory.Apps().V1().Deployments().Informer()
) or Start()
wont start anything.informerFactory.Start()
uses one internally.informerFactory.WaitForCacheSync()
method from working resulting in it getting the wrong data for started informers.labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj, ok := obj.(*appsv1.Deployment)
if !ok {
doSomething()
}
replicas := int(*mObj.Spec.Replicas)
doSomething()
},
UpdateFunc: func(oldObj, newObj interface{}) {
old, ok := oldObj.(*appsv1.Deployment)
if !ok {
doSomething()
}
newDeployment, ok := newObj.(*appsv1.Deployment)
if !ok {
doSomething()
}
oldReplicas := int(*old.Spec.Replicas)
newReplicas := int(*newDeployment.Spec.Replicas)
if oldReplicas != newReplicas {
doSomething()
}
},
})
// Initializes all active informers and starts the internal goroutine
factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
Upvotes: 5