Migrate K8s discovery service queues to use strongly typed queues

Signed-off-by: Michael Shen <mishen@umich.edu>
This commit is contained in:
Michael Shen 2025-09-23 20:30:23 -07:00
parent 9c525b84c4
commit 1eaddc64d0
No known key found for this signature in database
GPG Key ID: 12CC712F576BDFEE
6 changed files with 40 additions and 36 deletions

View File

@ -48,7 +48,7 @@ type Endpoints struct {
endpointsStore cache.Store endpointsStore cache.Store
serviceStore cache.Store serviceStore cache.Store
queue *workqueue.Typed[any] queue *workqueue.Typed[string]
} }
// NewEndpoints returns a new endpoints discovery. // NewEndpoints returns a new endpoints discovery.
@ -80,7 +80,7 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node,
withNodeMetadata: node != nil, withNodeMetadata: node != nil,
namespaceInf: namespace, namespaceInf: namespace,
withNamespaceMetadata: namespace != nil, withNamespaceMetadata: namespace != nil,
queue: workqueue.NewTypedWithConfig[any](workqueue.TypedQueueConfig[any]{ queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: RoleEndpoint.String(), Name: RoleEndpoint.String(),
}), }),
} }
@ -275,12 +275,11 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
func (e *Endpoints) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { func (e *Endpoints) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := e.queue.Get() key, quit := e.queue.Get()
if quit { if quit {
return false return false
} }
defer e.queue.Done(keyObj) defer e.queue.Done(key)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {

View File

@ -50,7 +50,7 @@ type EndpointSlice struct {
endpointSliceStore cache.Store endpointSliceStore cache.Store
serviceStore cache.Store serviceStore cache.Store
queue *workqueue.Type queue *workqueue.Typed[string]
} }
// NewEndpointSlice returns a new endpointslice discovery. // NewEndpointSlice returns a new endpointslice discovery.
@ -79,7 +79,9 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
withNodeMetadata: node != nil, withNodeMetadata: node != nil,
namespaceInf: namespace, namespaceInf: namespace,
withNamespaceMetadata: namespace != nil, withNamespaceMetadata: namespace != nil,
queue: workqueue.NewNamed(RoleEndpointSlice.String()), queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: RoleEndpointSlice.String(),
}),
} }
_, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ _, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -236,12 +238,11 @@ func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group)
} }
func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := e.queue.Get() key, quit := e.queue.Get()
if quit { if quit {
return false return false
} }
defer e.queue.Done(keyObj) defer e.queue.Done(key)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {

View File

@ -35,7 +35,7 @@ type Ingress struct {
logger *slog.Logger logger *slog.Logger
informer cache.SharedIndexInformer informer cache.SharedIndexInformer
store cache.Store store cache.Store
queue *workqueue.Type queue *workqueue.Typed[string]
namespaceInf cache.SharedInformer namespaceInf cache.SharedInformer
withNamespaceMetadata bool withNamespaceMetadata bool
} }
@ -50,7 +50,9 @@ func NewIngress(l *slog.Logger, inf cache.SharedIndexInformer, namespace cache.S
logger: l, logger: l,
informer: inf, informer: inf,
store: inf.GetStore(), store: inf.GetStore(),
queue: workqueue.NewNamed(RoleIngress.String()), queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: RoleIngress.String(),
}),
namespaceInf: namespace, namespaceInf: namespace,
withNamespaceMetadata: namespace != nil, withNamespaceMetadata: namespace != nil,
} }
@ -137,12 +139,11 @@ func (i *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
func (i *Ingress) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { func (i *Ingress) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := i.queue.Get() key, quit := i.queue.Get()
if quit { if quit {
return false return false
} }
defer i.queue.Done(keyObj) defer i.queue.Done(key)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {

View File

@ -41,7 +41,7 @@ type Node struct {
logger *slog.Logger logger *slog.Logger
informer cache.SharedInformer informer cache.SharedInformer
store cache.Store store cache.Store
queue *workqueue.Type queue *workqueue.Typed[string]
} }
// NewNode returns a new node discovery. // NewNode returns a new node discovery.
@ -58,7 +58,9 @@ func NewNode(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.Co
logger: l, logger: l,
informer: inf, informer: inf,
store: inf.GetStore(), store: inf.GetStore(),
queue: workqueue.NewNamed(RoleNode.String()), queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: RoleNode.String(),
}),
} }
_, err := n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ _, err := n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -111,12 +113,11 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
func (n *Node) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { func (n *Node) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := n.queue.Get() key, quit := n.queue.Get()
if quit { if quit {
return false return false
} }
defer n.queue.Done(keyObj) defer n.queue.Done(key)
key := keyObj.(string)
_, name, err := cache.SplitMetaNamespaceKey(key) _, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {

View File

@ -47,7 +47,7 @@ type Pod struct {
withNamespaceMetadata bool withNamespaceMetadata bool
store cache.Store store cache.Store
logger *slog.Logger logger *slog.Logger
queue *workqueue.Type queue *workqueue.Typed[string]
} }
// NewPod creates a new pod discovery. // NewPod creates a new pod discovery.
@ -68,7 +68,9 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes, namespace cac
withNamespaceMetadata: namespace != nil, withNamespaceMetadata: namespace != nil,
store: pods.GetStore(), store: pods.GetStore(),
logger: l, logger: l,
queue: workqueue.NewNamed(RolePod.String()), queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: RolePod.String(),
}),
} }
_, err := p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ _, err := p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o any) { AddFunc: func(o any) {
@ -166,12 +168,11 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
func (p *Pod) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { func (p *Pod) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := p.queue.Get() key, quit := p.queue.Get()
if quit { if quit {
return false return false
} }
defer p.queue.Done(keyObj) defer p.queue.Done(key)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {

View File

@ -36,7 +36,7 @@ type Service struct {
logger *slog.Logger logger *slog.Logger
informer cache.SharedIndexInformer informer cache.SharedIndexInformer
store cache.Store store cache.Store
queue *workqueue.Type queue *workqueue.Typed[string]
namespaceInf cache.SharedInformer namespaceInf cache.SharedInformer
withNamespaceMetadata bool withNamespaceMetadata bool
} }
@ -55,7 +55,9 @@ func NewService(l *slog.Logger, inf cache.SharedIndexInformer, namespace cache.S
logger: l, logger: l,
informer: inf, informer: inf,
store: inf.GetStore(), store: inf.GetStore(),
queue: workqueue.NewNamed(RoleService.String()), queue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{
Name: RoleService.String(),
}),
namespaceInf: namespace, namespaceInf: namespace,
withNamespaceMetadata: namespace != nil, withNamespaceMetadata: namespace != nil,
} }
@ -142,12 +144,11 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
func (s *Service) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { func (s *Service) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := s.queue.Get() key, quit := s.queue.Get()
if quit { if quit {
return false return false
} }
defer s.queue.Done(keyObj) defer s.queue.Done(key)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {