Merge pull request #16831 from machine424/nsmeta

feat(discovery/kubernetes): allow attaching namespace metadata
This commit is contained in:
Ayoub Mrini 2025-07-17 10:30:27 +01:00 committed by GitHub
commit 9dc274687b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1179 additions and 194 deletions

View File

@ -35,11 +35,13 @@ import (
type Endpoints struct {
logger *slog.Logger
endpointsInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
endpointsInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
podStore cache.Store
endpointsStore cache.Store
@ -50,7 +52,7 @@ type Endpoints struct {
// NewEndpoints returns a new endpoints discovery.
// Endpoints API is deprecated in k8s v1.33+, but we should still support it.
func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints {
func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints {
if l == nil {
l = promslog.NewNopLogger()
}
@ -66,16 +68,18 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node
podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate)
e := &Endpoints{
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed(RoleEndpoint.String()),
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
queue: workqueue.NewNamed(RoleEndpoint.String()),
}
_, err := e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -177,6 +181,20 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node
}
}
if e.withNamespaceMetadata {
_, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
e.enqueueNamespace(namespace.Name)
},
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
// No need to have additional handlers for them here.
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return e
}
@ -192,6 +210,18 @@ func (e *Endpoints) enqueueNode(nodeName string) {
}
}
func (e *Endpoints) enqueueNamespace(namespace string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
e.logger.Error("Error getting endpoints in namespace", "namespace", namespace, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *Endpoints) enqueuePod(podNamespacedName string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(podIndex, podNamespacedName)
if err != nil {
@ -221,6 +251,9 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if e.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, e.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
@ -308,6 +341,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
// Add endpoints labels metadata.
addObjectMetaLabels(tg.Labels, eps.ObjectMeta, RoleEndpoint)
if e.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace)
}
type podEntry struct {
pod *apiv1.Pod
servicePorts []apiv1.EndpointPort
@ -502,3 +539,20 @@ func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger *slog
addObjectMetaLabels(nodeLabelset, node.ObjectMeta, RoleNode)
return tg.Merge(nodeLabelset)
}
func addNamespaceLabels(tg model.LabelSet, namespaceInf cache.SharedInformer, logger *slog.Logger, namespace string) model.LabelSet {
obj, exists, err := namespaceInf.GetStore().GetByKey(namespace)
if err != nil {
logger.Error("Error getting namespace", "namespace", namespace, "err", err)
return tg
}
if !exists {
return tg
}
n := obj.(*apiv1.Namespace)
namespaceLabelset := make(model.LabelSet)
addNamespaceMetaLabels(namespaceLabelset, n.ObjectMeta)
return tg.Merge(namespaceLabelset)
}

View File

@ -15,6 +15,7 @@ package kubernetes
import (
"context"
"fmt"
"testing"
"github.com/prometheus/common/model"
@ -28,12 +29,12 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
)
func makeEndpoints() *v1.Endpoints {
func makeEndpoints(namespace string) *v1.Endpoints {
nodeName := "foobar"
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Namespace: namespace,
Annotations: map[string]string{
"test.annotation": "test",
},
@ -103,7 +104,7 @@ func TestEndpointsDiscoveryBeforeRun(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
obj := makeEndpoints()
obj := makeEndpoints("default")
c.CoreV1().Endpoints(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -279,12 +280,12 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
func TestEndpointsDiscoveryDelete(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpoints()
obj := makeEndpoints("default")
c.CoreV1().Endpoints(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -298,7 +299,7 @@ func TestEndpointsDiscoveryDelete(t *testing.T) {
func TestEndpointsDiscoveryUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -370,7 +371,7 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) {
func TestEndpointsDiscoveryEmptySubsets(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -399,7 +400,7 @@ func TestEndpointsDiscoveryEmptySubsets(t *testing.T) {
func TestEndpointsDiscoveryWithService(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -465,7 +466,7 @@ func TestEndpointsDiscoveryWithService(t *testing.T) {
func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -560,7 +561,7 @@ func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) {
},
},
}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node1, node2)
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints("default"), svc, node1, node2)
k8sDiscoveryTest{
discovery: n,
@ -634,7 +635,7 @@ func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
},
},
}
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), node1, node2, svc)
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints("default"), node1, node2, svc)
k8sDiscoveryTest{
discovery: n,
@ -698,7 +699,7 @@ func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
func TestEndpointsDiscoveryNamespaces(t *testing.T) {
t.Parallel()
epOne := makeEndpoints()
epOne := makeEndpoints("default")
epOne.Namespace = "ns1"
objs := []runtime.Object{
epOne,
@ -850,10 +851,10 @@ func TestEndpointsDiscoveryNamespaces(t *testing.T) {
func TestEndpointsDiscoveryOwnNamespace(t *testing.T) {
t.Parallel()
epOne := makeEndpoints()
epOne := makeEndpoints("default")
epOne.Namespace = "own-ns"
epTwo := makeEndpoints()
epTwo := makeEndpoints("default")
epTwo.Namespace = "non-own-ns"
podOne := &v1.Pod{
@ -945,7 +946,7 @@ func TestEndpointsDiscoveryOwnNamespace(t *testing.T) {
func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) {
t.Parallel()
ep := makeEndpoints()
ep := makeEndpoints("default")
ep.Namespace = "ns"
pod := &v1.Pod{
@ -1274,6 +1275,145 @@ func TestEndpointsDiscoverySidecarContainer(t *testing.T) {
}.Run(t)
}
func TestEndpointsDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"environment": "production", "team": "backend"}
nsAnnotations := map[string]string{"owner": "platform", "version": "v1.2.3"}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeEndpoints(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpoints/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
{
"__address__": "6.7.8.9:9002",
"__meta_kubernetes_endpoint_address_target_kind": "Node",
"__meta_kubernetes_endpoint_address_target_name": "barbaz",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_owner": "platform",
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
"__meta_kubernetes_namespace_annotation_version": "v1.2.3",
"__meta_kubernetes_namespace_annotationpresent_version": "true",
"__meta_kubernetes_namespace_label_environment": "production",
"__meta_kubernetes_namespace_labelpresent_environment": "true",
"__meta_kubernetes_namespace_label_team": "backend",
"__meta_kubernetes_namespace_labelpresent_team": "true",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_endpoints_annotation_test_annotation": "test",
"__meta_kubernetes_endpoints_annotationpresent_test_annotation": "true",
},
Source: fmt.Sprintf("endpoints/%s/testendpoints", ns),
},
},
}.Run(t)
}
func TestEndpointsDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"environment": "development", "team": "frontend"}
nsAnnotations := map[string]string{"owner": "devops", "version": "v2.1.0"}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeEndpoints(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
namespace.Labels["environment"] = "staging"
namespace.Labels["region"] = "us-west"
namespace.Annotations["owner"] = "sre"
namespace.Annotations["cost-center"] = "engineering"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpoints/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
{
"__address__": "6.7.8.9:9002",
"__meta_kubernetes_endpoint_address_target_kind": "Node",
"__meta_kubernetes_endpoint_address_target_name": "barbaz",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_owner": "sre",
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
"__meta_kubernetes_namespace_annotation_version": "v2.1.0",
"__meta_kubernetes_namespace_annotationpresent_version": "true",
"__meta_kubernetes_namespace_annotation_cost_center": "engineering",
"__meta_kubernetes_namespace_annotationpresent_cost_center": "true",
"__meta_kubernetes_namespace_label_environment": "staging",
"__meta_kubernetes_namespace_labelpresent_environment": "true",
"__meta_kubernetes_namespace_label_team": "frontend",
"__meta_kubernetes_namespace_labelpresent_team": "true",
"__meta_kubernetes_namespace_label_region": "us-west",
"__meta_kubernetes_namespace_labelpresent_region": "true",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_endpoints_annotation_test_annotation": "test",
"__meta_kubernetes_endpoints_annotationpresent_test_annotation": "true",
},
Source: fmt.Sprintf("endpoints/%s/testendpoints", ns),
},
},
}.Run(t)
}
func BenchmarkResolvePodRef(b *testing.B) {
indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)
e := &Endpoints{

View File

@ -38,11 +38,13 @@ const serviceIndex = "service"
type EndpointSlice struct {
logger *slog.Logger
endpointSliceInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
endpointSliceInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
podStore cache.Store
endpointSliceStore cache.Store
@ -52,7 +54,7 @@ type EndpointSlice struct {
}
// NewEndpointSlice returns a new endpointslice discovery.
func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice {
func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice {
if l == nil {
l = promslog.NewNopLogger()
}
@ -66,16 +68,18 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete)
e := &EndpointSlice{
logger: l,
endpointSliceInf: eps,
endpointSliceStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed(RoleEndpointSlice.String()),
logger: l,
endpointSliceInf: eps,
endpointSliceStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
queue: workqueue.NewNamed(RoleEndpointSlice.String()),
}
_, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -154,6 +158,20 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
}
}
if e.withNamespaceMetadata {
_, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
e.enqueueNamespace(namespace.Name)
},
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
// No need to have additional handlers for them here.
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return e
}
@ -169,6 +187,18 @@ func (e *EndpointSlice) enqueueNode(nodeName string) {
}
}
func (e *EndpointSlice) enqueueNamespace(namespace string) {
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
e.logger.Error("Error getting endpoints in namespace", "namespace", namespace, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *EndpointSlice) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
@ -186,6 +216,9 @@ func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group)
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if e.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, e.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
e.logger.Error("endpointslice informer unable to sync cache")
@ -274,6 +307,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps v1.EndpointSlice) *targetgroup.Gr
e.addServiceLabels(eps, tg)
if e.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace)
}
type podEntry struct {
pod *apiv1.Pod
servicePorts []v1.EndpointPort

View File

@ -15,6 +15,7 @@ package kubernetes
import (
"context"
"fmt"
"testing"
"github.com/prometheus/common/model"
@ -44,11 +45,11 @@ func protocolptr(p corev1.Protocol) *corev1.Protocol {
return &p
}
func makeEndpointSliceV1() *v1.EndpointSlice {
func makeEndpointSliceV1(namespace string) *v1.EndpointSlice {
return &v1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Namespace: namespace,
Labels: map[string]string{
v1.LabelServiceName: "testendpoints",
},
@ -113,6 +114,16 @@ func makeEndpointSliceV1() *v1.EndpointSlice {
}
}
func makeNamespace(name string, labels, annotations map[string]string) *corev1.Namespace {
return &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
Annotations: annotations,
},
}
}
func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}})
@ -120,7 +131,7 @@ func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
c.DiscoveryV1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -325,12 +336,12 @@ func TestEndpointSliceDiscoveryAdd(t *testing.T) {
func TestEndpointSliceDiscoveryDelete(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
c.DiscoveryV1().EndpointSlices(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -344,12 +355,12 @@ func TestEndpointSliceDiscoveryDelete(t *testing.T) {
func TestEndpointSliceDiscoveryUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
obj.ObjectMeta.Labels = nil
obj.ObjectMeta.Annotations = nil
obj.Endpoints = obj.Endpoints[0:2]
@ -401,12 +412,12 @@ func TestEndpointSliceDiscoveryUpdate(t *testing.T) {
func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
obj.Endpoints = []v1.Endpoint{}
c.DiscoveryV1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
},
@ -430,7 +441,7 @@ func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) {
func TestEndpointSliceDiscoveryWithService(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
@ -523,7 +534,7 @@ func TestEndpointSliceDiscoveryWithService(t *testing.T) {
func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
@ -643,7 +654,7 @@ func TestEndpointsSlicesDiscoveryWithNodeMetadata(t *testing.T) {
},
},
}
objs := []runtime.Object{makeEndpointSliceV1(), makeNode("foobar", "", "", nodeLabels1, nil), makeNode("barbaz", "", "", nodeLabels2, nil), svc}
objs := []runtime.Object{makeEndpointSliceV1("default"), makeNode("foobar", "", "", nodeLabels1, nil), makeNode("barbaz", "", "", nodeLabels2, nil), svc}
n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
@ -745,7 +756,7 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
}
node1 := makeNode("foobar", "", "", nodeLabels1, nil)
node2 := makeNode("barbaz", "", "", nodeLabels2, nil)
objs := []runtime.Object{makeEndpointSliceV1(), node1, node2, svc}
objs := []runtime.Object{makeEndpointSliceV1("default"), node1, node2, svc}
n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
@ -837,7 +848,7 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
t.Parallel()
epOne := makeEndpointSliceV1()
epOne := makeEndpointSliceV1("default")
epOne.Namespace = "ns1"
objs := []runtime.Object{
epOne,
@ -1014,10 +1025,10 @@ func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
func TestEndpointSliceDiscoveryOwnNamespace(t *testing.T) {
t.Parallel()
epOne := makeEndpointSliceV1()
epOne := makeEndpointSliceV1("default")
epOne.Namespace = "own-ns"
epTwo := makeEndpointSliceV1()
epTwo := makeEndpointSliceV1("default")
epTwo.Namespace = "non-own-ns"
podOne := &corev1.Pod{
@ -1135,7 +1146,7 @@ func TestEndpointSliceDiscoveryOwnNamespace(t *testing.T) {
func TestEndpointSliceDiscoveryEmptyPodStatus(t *testing.T) {
t.Parallel()
ep := makeEndpointSliceV1()
ep := makeEndpointSliceV1("default")
ep.Namespace = "ns"
pod := &corev1.Pod{
@ -1380,3 +1391,223 @@ func TestEndpointSliceDiscoverySidecarContainer(t *testing.T) {
},
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"service": "web", "layer": "frontend"}
nsAnnotations := map[string]string{"contact": "platform", "release": "v5.6.7"}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: ns,
Labels: map[string]string{
"app/name": "test",
},
},
}
objs := []runtime.Object{makeNamespace(ns, nsLabels, nsAnnotations), svc, makeEndpointSliceV1(ns)}
n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpointslice/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_node_name": "foobar",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1b",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "true",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1c",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "4.5.6.7:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "Node",
"__meta_kubernetes_endpointslice_address_target_name": "barbaz",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "testendpoints",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_annotation_test_annotation": "test",
"__meta_kubernetes_endpointslice_annotationpresent_test_annotation": "true",
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_contact": "platform",
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
"__meta_kubernetes_namespace_annotation_release": "v5.6.7",
"__meta_kubernetes_namespace_annotationpresent_release": "true",
"__meta_kubernetes_namespace_label_service": "web",
"__meta_kubernetes_namespace_labelpresent_service": "true",
"__meta_kubernetes_namespace_label_layer": "frontend",
"__meta_kubernetes_namespace_labelpresent_layer": "true",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: fmt.Sprintf("endpointslice/%s/testendpoints", ns),
},
},
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"component": "database", "layer": "backend"}
nsAnnotations := map[string]string{"contact": "dba", "release": "v6.7.8"}
metadataConfig := AttachMetadataConfig{Namespace: true}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: ns,
Labels: map[string]string{
"app/name": "test",
},
},
}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, namespace, svc, makeEndpointSliceV1(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
namespace.Labels["component"] = "cache"
namespace.Labels["region"] = "us-central"
namespace.Annotations["contact"] = "sre"
namespace.Annotations["monitoring"] = "enabled"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpointslice/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_node_name": "foobar",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1b",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "true",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1c",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "4.5.6.7:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "Node",
"__meta_kubernetes_endpointslice_address_target_name": "barbaz",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "testendpoints",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_annotation_test_annotation": "test",
"__meta_kubernetes_endpointslice_annotationpresent_test_annotation": "true",
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_contact": "sre",
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
"__meta_kubernetes_namespace_annotation_release": "v6.7.8",
"__meta_kubernetes_namespace_annotationpresent_release": "true",
"__meta_kubernetes_namespace_annotation_monitoring": "enabled",
"__meta_kubernetes_namespace_annotationpresent_monitoring": "true",
"__meta_kubernetes_namespace_label_component": "cache",
"__meta_kubernetes_namespace_labelpresent_component": "true",
"__meta_kubernetes_namespace_label_layer": "backend",
"__meta_kubernetes_namespace_labelpresent_layer": "true",
"__meta_kubernetes_namespace_label_region": "us-central",
"__meta_kubernetes_namespace_labelpresent_region": "true",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: fmt.Sprintf("endpointslice/%s/testendpoints", ns),
},
},
}.Run(t)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -31,23 +32,27 @@ import (
// Ingress implements discovery of Kubernetes ingress.
type Ingress struct {
logger *slog.Logger
informer cache.SharedInformer
store cache.Store
queue *workqueue.Type
logger *slog.Logger
informer cache.SharedIndexInformer
store cache.Store
queue *workqueue.Type
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
}
// NewIngress returns a new ingress discovery.
func NewIngress(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Ingress {
func NewIngress(l *slog.Logger, inf cache.SharedIndexInformer, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Ingress {
ingressAddCount := eventCount.WithLabelValues(RoleIngress.String(), MetricLabelRoleAdd)
ingressUpdateCount := eventCount.WithLabelValues(RoleIngress.String(), MetricLabelRoleUpdate)
ingressDeleteCount := eventCount.WithLabelValues(RoleIngress.String(), MetricLabelRoleDelete)
s := &Ingress{
logger: l,
informer: inf,
store: inf.GetStore(),
queue: workqueue.NewNamed(RoleIngress.String()),
logger: l,
informer: inf,
store: inf.GetStore(),
queue: workqueue.NewNamed(RoleIngress.String()),
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
}
_, err := s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -67,6 +72,21 @@ func NewIngress(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus
if err != nil {
l.Error("Error adding ingresses event handler.", "err", err)
}
if s.withNamespaceMetadata {
_, err = s.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
s.enqueueNamespace(namespace.Name)
},
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
// No need to have additional handlers for them here.
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return s
}
@ -79,11 +99,28 @@ func (i *Ingress) enqueue(obj interface{}) {
i.queue.Add(key)
}
func (i *Ingress) enqueueNamespace(namespace string) {
ingresses, err := i.informer.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
i.logger.Error("Error getting ingresses in namespace", "namespace", namespace, "err", err)
return
}
for _, ingress := range ingresses {
i.enqueue(ingress)
}
}
// Run implements the Discoverer interface.
func (i *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer i.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), i.informer.HasSynced) {
cacheSyncs := []cache.InformerSynced{i.informer.HasSynced}
if i.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, i.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
i.logger.Error("ingress informer unable to sync cache")
}
@ -200,6 +237,10 @@ func (i *Ingress) buildIngress(ingress v1.Ingress) *targetgroup.Group {
}
tg.Labels = ingressLabels(ingress)
if i.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, i.namespaceInf, i.logger, ingress.Namespace)
}
for _, rule := range ingress.Spec.Rules {
scheme := "http"
paths := pathsFromIngressPaths(rulePaths(rule))

View File

@ -34,11 +34,11 @@ const (
TLSWildcard
)
func makeIngress(tls TLSMode) *v1.Ingress {
func makeIngress(namespace string, tls TLSMode) *v1.Ingress {
ret := &v1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "testingress",
Namespace: "default",
Namespace: namespace,
Labels: map[string]string{"test/label": "testvalue"},
Annotations: map[string]string{"test/annotation": "testannotationvalue"},
},
@ -150,7 +150,7 @@ func TestIngressDiscoveryAdd(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeIngress(TLSNo)
obj := makeIngress("default", TLSNo)
c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -165,7 +165,7 @@ func TestIngressDiscoveryAddTLS(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeIngress(TLSYes)
obj := makeIngress("default", TLSYes)
c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -180,7 +180,7 @@ func TestIngressDiscoveryAddMixed(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeIngress(TLSMixed)
obj := makeIngress("default", TLSMixed)
c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -200,8 +200,7 @@ func TestIngressDiscoveryNamespaces(t *testing.T) {
discovery: n,
afterStart: func() {
for _, ns := range []string{"ns1", "ns2"} {
obj := makeIngress(TLSNo)
obj.Namespace = ns
obj := makeIngress(ns, TLSNo)
c.NetworkingV1().Ingresses(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
}
},
@ -219,8 +218,7 @@ func TestIngressDiscoveryOwnNamespace(t *testing.T) {
discovery: n,
afterStart: func() {
for _, ns := range []string{"own-ns", "non-own-ns"} {
obj := makeIngress(TLSNo)
obj.Namespace = ns
obj := makeIngress(ns, TLSNo)
c.NetworkingV1().Ingresses(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
}
},
@ -228,3 +226,128 @@ func TestIngressDiscoveryOwnNamespace(t *testing.T) {
expectedRes: expected,
}.Run(t)
}
func TestIngressDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"service": "web", "layer": "frontend"}
nsAnnotations := map[string]string{"contact": "platform", "release": "v5.6.7"}
n, _ := makeDiscoveryWithMetadata(RoleIngress, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeIngress(ns, TLSNo))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("ingress/%s/testingress", ns): {
Targets: []model.LabelSet{
{
"__address__": "example.com",
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/",
"__meta_kubernetes_ingress_scheme": "http",
},
{
"__address__": "example.com",
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/foo",
"__meta_kubernetes_ingress_scheme": "http",
},
{
"__address__": "test.example.com",
"__meta_kubernetes_ingress_host": "test.example.com",
"__meta_kubernetes_ingress_path": "/",
"__meta_kubernetes_ingress_scheme": "http",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_contact": "platform",
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
"__meta_kubernetes_namespace_annotation_release": "v5.6.7",
"__meta_kubernetes_namespace_annotationpresent_release": "true",
"__meta_kubernetes_namespace_label_service": "web",
"__meta_kubernetes_namespace_labelpresent_service": "true",
"__meta_kubernetes_namespace_label_layer": "frontend",
"__meta_kubernetes_namespace_labelpresent_layer": "true",
"__meta_kubernetes_ingress_name": "testingress",
"__meta_kubernetes_ingress_label_test_label": "testvalue",
"__meta_kubernetes_ingress_labelpresent_test_label": "true",
"__meta_kubernetes_ingress_annotation_test_annotation": "testannotationvalue",
"__meta_kubernetes_ingress_annotationpresent_test_annotation": "true",
"__meta_kubernetes_ingress_class_name": "testclass",
},
Source: fmt.Sprintf("ingress/%s/testingress", ns),
},
},
}.Run(t)
}
func TestIngressDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"component": "database", "layer": "backend"}
nsAnnotations := map[string]string{"contact": "dba", "release": "v6.7.8"}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RoleIngress, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeIngress(ns, TLSNo))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
namespace.Labels["component"] = "cache"
namespace.Labels["region"] = "us-central"
namespace.Annotations["contact"] = "sre"
namespace.Annotations["monitoring"] = "enabled"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("ingress/%s/testingress", ns): {
Targets: []model.LabelSet{
{
"__address__": "example.com",
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/",
"__meta_kubernetes_ingress_scheme": "http",
},
{
"__address__": "example.com",
"__meta_kubernetes_ingress_host": "example.com",
"__meta_kubernetes_ingress_path": "/foo",
"__meta_kubernetes_ingress_scheme": "http",
},
{
"__address__": "test.example.com",
"__meta_kubernetes_ingress_host": "test.example.com",
"__meta_kubernetes_ingress_path": "/",
"__meta_kubernetes_ingress_scheme": "http",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_contact": "sre",
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
"__meta_kubernetes_namespace_annotation_release": "v6.7.8",
"__meta_kubernetes_namespace_annotationpresent_release": "true",
"__meta_kubernetes_namespace_annotation_monitoring": "enabled",
"__meta_kubernetes_namespace_annotationpresent_monitoring": "true",
"__meta_kubernetes_namespace_label_component": "cache",
"__meta_kubernetes_namespace_labelpresent_component": "true",
"__meta_kubernetes_namespace_label_layer": "backend",
"__meta_kubernetes_namespace_labelpresent_layer": "true",
"__meta_kubernetes_namespace_label_region": "us-central",
"__meta_kubernetes_namespace_labelpresent_region": "true",
"__meta_kubernetes_ingress_name": "testingress",
"__meta_kubernetes_ingress_label_test_label": "testvalue",
"__meta_kubernetes_ingress_labelpresent_test_label": "true",
"__meta_kubernetes_ingress_annotation_test_annotation": "testannotationvalue",
"__meta_kubernetes_ingress_annotationpresent_test_annotation": "true",
"__meta_kubernetes_ingress_class_name": "testclass",
},
Source: fmt.Sprintf("ingress/%s/testingress", ns),
},
},
}.Run(t)
}

View File

@ -153,9 +153,10 @@ type resourceSelector struct {
}
// AttachMetadataConfig is the configuration for attaching additional metadata
// coming from nodes on which the targets are scheduled.
// coming from namespaces or nodes on which the targets are scheduled.
type AttachMetadataConfig struct {
Node bool `yaml:"node"`
Node bool `yaml:"node"`
Namespace bool `yaml:"namespace"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -397,7 +398,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return e.Watch(ctx, options)
},
}
informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{})
informer = d.newIndexedEndpointSlicesInformer(elw, &disv1.EndpointSlice{})
s := d.client.CoreV1().Services(namespace)
slw := &cache.ListWatch{
@ -430,12 +431,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
nodeInf = d.newNodeInformer(context.Background())
go nodeInf.Run(ctx.Done())
}
var namespaceInf cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInf = d.newNamespaceInformer(context.Background())
go namespaceInf.Run(ctx.Done())
}
eps := NewEndpointSlice(
d.logger.With("role", "endpointslice"),
informer,
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf,
namespaceInf,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, eps)
@ -489,13 +496,19 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
nodeInf = d.newNodeInformer(ctx)
go nodeInf.Run(ctx.Done())
}
var namespaceInf cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInf = d.newNamespaceInformer(ctx)
go namespaceInf.Run(ctx.Done())
}
eps := NewEndpoints(
d.logger.With("role", "endpoint"),
d.newEndpointsByNodeInformer(elw),
d.newIndexedEndpointsInformer(elw),
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf,
namespaceInf,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, eps)
@ -509,6 +522,11 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
nodeInformer = d.newNodeInformer(ctx)
go nodeInformer.Run(ctx.Done())
}
var namespaceInformer cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInformer = d.newNamespaceInformer(ctx)
go namespaceInformer.Run(ctx.Done())
}
for _, namespace := range namespaces {
p := d.client.CoreV1().Pods(namespace)
@ -526,14 +544,21 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
pod := NewPod(
d.logger.With("role", "pod"),
d.newPodsByNodeInformer(plw),
d.newIndexedPodsInformer(plw),
nodeInformer,
namespaceInformer,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, pod)
go pod.podInf.Run(ctx.Done())
}
case RoleService:
var namespaceInformer cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInformer = d.newNamespaceInformer(ctx)
go namespaceInformer.Run(ctx.Done())
}
for _, namespace := range namespaces {
s := d.client.CoreV1().Services(namespace)
slw := &cache.ListWatch{
@ -550,15 +575,21 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
svc := NewService(
d.logger.With("role", "service"),
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.newIndexedServicesInformer(slw),
namespaceInformer,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, svc)
go svc.informer.Run(ctx.Done())
}
case RoleIngress:
var namespaceInformer cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInformer = d.newNamespaceInformer(ctx)
go namespaceInformer.Run(ctx.Done())
}
for _, namespace := range namespaces {
var informer cache.SharedInformer
i := d.client.NetworkingV1().Ingresses(namespace)
ilw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -572,10 +603,10 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options)
},
}
informer = d.mustNewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
ingress := NewIngress(
d.logger.With("role", "ingress"),
informer,
d.newIndexedIngressesInformer(ilw),
namespaceInformer,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, ingress)
@ -651,7 +682,20 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
return d.mustNewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
}
func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
func (d *Discovery) newNamespaceInformer(ctx context.Context) cache.SharedInformer {
// We don't filter on NamespaceDiscovery.
nlw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Namespaces().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Namespaces().Watch(ctx, options)
},
}
return d.mustNewSharedInformer(nlw, &apiv1.Namespace{}, resyncDisabled)
}
func (d *Discovery) newIndexedPodsInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if d.attachMetadata.Node {
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
@ -663,10 +707,14 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
}
}
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
}
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
func (d *Discovery) newIndexedEndpointsInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
indexers[podIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
@ -683,37 +731,40 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
}
return pods, nil
}
if !d.attachMetadata.Node {
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
if !ok {
return nil, errors.New("object is not endpoints")
}
var nodes []string
for _, target := range e.Subsets {
for _, addr := range target.Addresses {
if addr.TargetRef != nil {
switch addr.TargetRef.Kind {
case "Pod":
if addr.NodeName != nil {
nodes = append(nodes, *addr.NodeName)
if d.attachMetadata.Node {
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
if !ok {
return nil, errors.New("object is not endpoints")
}
var nodes []string
for _, target := range e.Subsets {
for _, addr := range target.Addresses {
if addr.TargetRef != nil {
switch addr.TargetRef.Kind {
case "Pod":
if addr.NodeName != nil {
nodes = append(nodes, *addr.NodeName)
}
case "Node":
nodes = append(nodes, addr.TargetRef.Name)
}
case "Node":
nodes = append(nodes, addr.TargetRef.Name)
}
}
}
return nodes, nil
}
return nodes, nil
}
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
}
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
func (d *Discovery) newIndexedEndpointSlicesInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
indexers[serviceIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*disv1.EndpointSlice)
@ -728,36 +779,59 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
return []string{namespacedName(e.Namespace, svcName)}, nil
}
if !d.attachMetadata.Node {
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*disv1.EndpointSlice)
if !ok {
return nil, errors.New("object is not an endpointslice")
}
if d.attachMetadata.Node {
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*disv1.EndpointSlice)
if !ok {
return nil, errors.New("object is not an endpointslice")
}
var nodes []string
for _, target := range e.Endpoints {
if target.TargetRef != nil {
switch target.TargetRef.Kind {
case "Pod":
if target.NodeName != nil {
nodes = append(nodes, *target.NodeName)
var nodes []string
for _, target := range e.Endpoints {
if target.TargetRef != nil {
switch target.TargetRef.Kind {
case "Pod":
if target.NodeName != nil {
nodes = append(nodes, *target.NodeName)
}
case "Node":
nodes = append(nodes, target.TargetRef.Name)
}
case "Node":
nodes = append(nodes, target.TargetRef.Name)
}
}
}
return nodes, nil
return nodes, nil
}
}
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}
func (d *Discovery) newIndexedServicesInformer(slw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(slw, &apiv1.Service{}, resyncDisabled, indexers)
}
func (d *Discovery) newIndexedIngressesInformer(ilw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(ilw, &networkv1.Ingress{}, resyncDisabled, indexers)
}
func (d *Discovery) informerWatchErrorHandler(r *cache.Reflector, err error) {
d.metrics.failuresCount.Inc()
cache.DefaultWatchErrorHandler(r, err)
@ -783,22 +857,29 @@ func (d *Discovery) mustNewSharedIndexInformer(lw cache.ListerWatcher, exampleOb
return informer
}
func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, role Role) {
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_name")] = lv(objectMeta.Name)
func addObjectAnnotationsAndLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, resource string) {
for k, v := range objectMeta.Labels {
ln := strutil.SanitizeLabelName(k)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_label_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_labelpresent_"+ln)] = presentValue
labelSet[model.LabelName(metaLabelPrefix+resource+"_label_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+resource+"_labelpresent_"+ln)] = presentValue
}
for k, v := range objectMeta.Annotations {
ln := strutil.SanitizeLabelName(k)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotation_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotationpresent_"+ln)] = presentValue
labelSet[model.LabelName(metaLabelPrefix+resource+"_annotation_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+resource+"_annotationpresent_"+ln)] = presentValue
}
}
func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, role Role) {
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_name")] = lv(objectMeta.Name)
addObjectAnnotationsAndLabels(labelSet, objectMeta, string(role))
}
func addNamespaceMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta) {
// Omitting the namespace name because should be already injected elsewhere.
addObjectAnnotationsAndLabels(labelSet, objectMeta, "namespace")
}
func namespacedName(namespace, name string) string {
return namespace + "/" + name
}

View File

@ -40,16 +40,18 @@ const (
// Pod discovers new pod targets.
type Pod struct {
podInf cache.SharedIndexInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
store cache.Store
logger *slog.Logger
queue *workqueue.Type
podInf cache.SharedIndexInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
store cache.Store
logger *slog.Logger
queue *workqueue.Type
}
// NewPod creates a new pod discovery.
func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod {
func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod {
if l == nil {
l = promslog.NewNopLogger()
}
@ -59,12 +61,14 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn
podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate)
p := &Pod{
podInf: pods,
nodeInf: nodes,
withNodeMetadata: nodes != nil,
store: pods.GetStore(),
logger: l,
queue: workqueue.NewNamed(RolePod.String()),
podInf: pods,
nodeInf: nodes,
withNodeMetadata: nodes != nil,
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
store: pods.GetStore(),
logger: l,
queue: workqueue.NewNamed(RolePod.String()),
}
_, err := p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
@ -107,6 +111,20 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn
}
}
if p.withNamespaceMetadata {
_, err = p.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
p.enqueuePodsForNamespace(namespace.Name)
},
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
// No need to have additional handlers for them here.
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return p
}
@ -127,6 +145,9 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
if p.withNodeMetadata {
cacheSyncs = append(cacheSyncs, p.nodeInf.HasSynced)
}
if p.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, p.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
@ -269,6 +290,9 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
if p.withNodeMetadata {
tg.Labels = addNodeLabels(tg.Labels, p.nodeInf, p.logger, &pod.Spec.NodeName)
}
if p.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, p.namespaceInf, p.logger, pod.Namespace)
}
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for i, c := range containers {
@ -327,6 +351,18 @@ func (p *Pod) enqueuePodsForNode(nodeName string) {
}
}
func (p *Pod) enqueuePodsForNamespace(namespace string) {
pods, err := p.podInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
p.logger.Error("Error getting pods in namespace", "namespace", namespace, "err", err)
return
}
for _, pod := range pods {
p.enqueue(pod.(*apiv1.Pod))
}
}
func podSource(pod *apiv1.Pod) string {
return podSourceFromNamespaceAndName(pod.Namespace, pod.Name)
}

View File

@ -95,11 +95,11 @@ func makeMultiPortPods() *v1.Pod {
}
}
func makePods() *v1.Pod {
func makePods(namespace string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "default",
Namespace: namespace,
UID: types.UID("abc123"),
},
Spec: v1.PodSpec{
@ -337,7 +337,7 @@ func TestPodDiscoveryAdd(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makePods()
obj := makePods("default")
c.CoreV1().Pods(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -347,13 +347,13 @@ func TestPodDiscoveryAdd(t *testing.T) {
func TestPodDiscoveryDelete(t *testing.T) {
t.Parallel()
obj := makePods()
obj := makePods("default")
n, c := makeDiscovery(RolePod, NamespaceDiscovery{}, obj)
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makePods()
obj := makePods("default")
c.CoreV1().Pods(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -399,7 +399,7 @@ func TestPodDiscoveryUpdate(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makePods()
obj := makePods("default")
c.CoreV1().Pods(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
@ -410,9 +410,9 @@ func TestPodDiscoveryUpdate(t *testing.T) {
func TestPodDiscoveryUpdateEmptyPodIP(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RolePod, NamespaceDiscovery{})
initialPod := makePods()
initialPod := makePods("default")
updatedPod := makePods()
updatedPod := makePods("default")
updatedPod.Status.PodIP = ""
k8sDiscoveryTest{
@ -444,7 +444,7 @@ func TestPodDiscoveryNamespaces(t *testing.T) {
discovery: n,
beforeRun: func() {
for _, ns := range []string{"ns1", "ns2"} {
pod := makePods()
pod := makePods("default")
pod.Namespace = ns
c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
}
@ -463,7 +463,7 @@ func TestPodDiscoveryOwnNamespace(t *testing.T) {
discovery: n,
beforeRun: func() {
for _, ns := range []string{"own-ns", "non-own-ns"} {
pod := makePods()
pod := makePods("default")
pod.Namespace = ns
c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
}
@ -485,7 +485,7 @@ func TestPodDiscoveryWithNodeMetadata(t *testing.T) {
nodes := makeNode("testnode", "", "", nodeLbls, nil)
c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{})
pods := makePods()
pods := makePods("default")
c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{})
},
expectedMaxItems: 2,
@ -507,7 +507,7 @@ func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) {
c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{})
},
afterStart: func() {
pods := makePods()
pods := makePods("default")
c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{})
nodes := makeNode("testnode", "", "", nodeLbls, nil)
@ -517,3 +517,114 @@ func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) {
expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls),
}.Run(t)
}
func TestPodDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"app": "web", "tier": "frontend"}
nsAnnotations := map[string]string{"maintainer": "devops", "build": "v3.4.5"}
n, _ := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makePods(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("pod/%s/testpod", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_image": "testcontainer:latest",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_container_init": "false",
"__meta_kubernetes_pod_container_id": "docker://a1b2c3d4e5f6",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_maintainer": "devops",
"__meta_kubernetes_namespace_annotationpresent_maintainer": "true",
"__meta_kubernetes_namespace_annotation_build": "v3.4.5",
"__meta_kubernetes_namespace_annotationpresent_build": "true",
"__meta_kubernetes_namespace_label_app": "web",
"__meta_kubernetes_namespace_labelpresent_app": "true",
"__meta_kubernetes_namespace_label_tier": "frontend",
"__meta_kubernetes_namespace_labelpresent_tier": "true",
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_phase": "Running",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: fmt.Sprintf("pod/%s/testpod", ns),
},
},
}.Run(t)
}
func TestPodDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"app": "api", "tier": "backend"}
nsAnnotations := map[string]string{"maintainer": "platform", "build": "v4.5.6"}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makePods(ns))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
namespace.Labels["app"] = "service"
namespace.Labels["zone"] = "us-east"
namespace.Annotations["maintainer"] = "sre"
namespace.Annotations["deployment"] = "canary"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("pod/%s/testpod", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_image": "testcontainer:latest",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_container_init": "false",
"__meta_kubernetes_pod_container_id": "docker://a1b2c3d4e5f6",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_maintainer": "sre",
"__meta_kubernetes_namespace_annotationpresent_maintainer": "true",
"__meta_kubernetes_namespace_annotation_build": "v4.5.6",
"__meta_kubernetes_namespace_annotationpresent_build": "true",
"__meta_kubernetes_namespace_annotation_deployment": "canary",
"__meta_kubernetes_namespace_annotationpresent_deployment": "true",
"__meta_kubernetes_namespace_label_app": "service",
"__meta_kubernetes_namespace_labelpresent_app": "true",
"__meta_kubernetes_namespace_label_tier": "backend",
"__meta_kubernetes_namespace_labelpresent_tier": "true",
"__meta_kubernetes_namespace_label_zone": "us-east",
"__meta_kubernetes_namespace_labelpresent_zone": "true",
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_phase": "Running",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: fmt.Sprintf("pod/%s/testpod", ns),
},
},
}.Run(t)
}

View File

@ -33,14 +33,16 @@ import (
// Service implements discovery of Kubernetes services.
type Service struct {
logger *slog.Logger
informer cache.SharedInformer
store cache.Store
queue *workqueue.Type
logger *slog.Logger
informer cache.SharedIndexInformer
store cache.Store
queue *workqueue.Type
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
}
// NewService returns a new service discovery.
func NewService(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Service {
func NewService(l *slog.Logger, inf cache.SharedIndexInformer, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Service {
if l == nil {
l = promslog.NewNopLogger()
}
@ -50,10 +52,12 @@ func NewService(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus
svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete)
s := &Service{
logger: l,
informer: inf,
store: inf.GetStore(),
queue: workqueue.NewNamed(RoleService.String()),
logger: l,
informer: inf,
store: inf.GetStore(),
queue: workqueue.NewNamed(RoleService.String()),
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
}
_, err := s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -73,6 +77,21 @@ func NewService(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus
if err != nil {
l.Error("Error adding services event handler.", "err", err)
}
if s.withNamespaceMetadata {
_, err = s.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
s.enqueueNamespace(namespace.Name)
},
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
// No need to have additional handlers for them here.
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return s
}
@ -85,11 +104,28 @@ func (s *Service) enqueue(obj interface{}) {
s.queue.Add(key)
}
func (s *Service) enqueueNamespace(namespace string) {
services, err := s.informer.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
s.logger.Error("Error getting services in namespace", "namespace", namespace, "err", err)
return
}
for _, service := range services {
s.enqueue(service)
}
}
// Run implements the Discoverer interface.
func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer s.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
cacheSyncs := []cache.InformerSynced{s.informer.HasSynced}
if s.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, s.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
s.logger.Error("service informer unable to sync cache")
}
@ -175,6 +211,10 @@ func (s *Service) buildService(svc *apiv1.Service) *targetgroup.Group {
}
tg.Labels = serviceLabels(svc)
if s.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, s.namespaceInf, s.logger, svc.Namespace)
}
for _, port := range svc.Spec.Ports {
addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10))

View File

@ -52,11 +52,11 @@ func makeMultiPortService() *v1.Service {
}
}
func makeSuffixedService(suffix string) *v1.Service {
func makeService(namespace string) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("testservice%s", suffix),
Namespace: "default",
Name: "testservice",
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
@ -72,10 +72,6 @@ func makeSuffixedService(suffix string) *v1.Service {
}
}
func makeService() *v1.Service {
return makeSuffixedService("")
}
func makeExternalService() *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -124,7 +120,7 @@ func TestServiceDiscoveryAdd(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeService()
obj := makeService("default")
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
obj = makeExternalService()
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
@ -191,12 +187,12 @@ func TestServiceDiscoveryAdd(t *testing.T) {
func TestServiceDiscoveryDelete(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService())
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeService()
obj := makeService("default")
c.CoreV1().Services(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -210,7 +206,7 @@ func TestServiceDiscoveryDelete(t *testing.T) {
func TestServiceDiscoveryUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService())
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService("default"))
k8sDiscoveryTest{
discovery: n,
@ -261,8 +257,7 @@ func TestServiceDiscoveryNamespaces(t *testing.T) {
discovery: n,
afterStart: func() {
for _, ns := range []string{"ns1", "ns2"} {
obj := makeService()
obj.Namespace = ns
obj := makeService(ns)
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
}
},
@ -314,8 +309,7 @@ func TestServiceDiscoveryOwnNamespace(t *testing.T) {
discovery: n,
afterStart: func() {
for _, ns := range []string{"own-ns", "non-own-ns"} {
obj := makeService()
obj.Namespace = ns
obj := makeService(ns)
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
}
},
@ -350,8 +344,7 @@ func TestServiceDiscoveryAllNamespaces(t *testing.T) {
discovery: n,
afterStart: func() {
for _, ns := range []string{"own-ns", "non-own-ns"} {
obj := makeService()
obj.Namespace = ns
obj := makeService(ns)
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
}
},
@ -394,3 +387,98 @@ func TestServiceDiscoveryAllNamespaces(t *testing.T) {
},
}.Run(t)
}
func TestServiceDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"environment": "production", "team": "backend"}
nsAnnotations := map[string]string{"owner": "platform", "version": "v1.2.3"}
n, _ := makeDiscoveryWithMetadata(RoleService, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeService(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("svc/%s/testservice", ns): {
Targets: []model.LabelSet{
{
"__address__": "testservice.test-ns.svc:30900",
"__meta_kubernetes_service_cluster_ip": "10.0.0.1",
"__meta_kubernetes_service_port_name": "testport",
"__meta_kubernetes_service_port_number": "30900",
"__meta_kubernetes_service_port_protocol": "TCP",
"__meta_kubernetes_service_type": "ClusterIP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_owner": "platform",
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
"__meta_kubernetes_namespace_annotation_version": "v1.2.3",
"__meta_kubernetes_namespace_annotationpresent_version": "true",
"__meta_kubernetes_namespace_label_environment": "production",
"__meta_kubernetes_namespace_labelpresent_environment": "true",
"__meta_kubernetes_namespace_label_team": "backend",
"__meta_kubernetes_namespace_labelpresent_team": "true",
"__meta_kubernetes_service_name": "testservice",
},
Source: fmt.Sprintf("svc/%s/testservice", ns),
},
},
}.Run(t)
}
func TestServiceDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"environment": "development", "team": "frontend"}
nsAnnotations := map[string]string{"owner": "devops", "version": "v2.1.0"}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RoleService, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeService(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
namespace.Labels["environment"] = "staging"
namespace.Labels["region"] = "us-west"
namespace.Annotations["owner"] = "sre"
namespace.Annotations["cost-center"] = "engineering"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("svc/%s/testservice", ns): {
Targets: []model.LabelSet{
{
"__address__": "testservice.test-ns.svc:30900",
"__meta_kubernetes_service_cluster_ip": "10.0.0.1",
"__meta_kubernetes_service_port_name": "testport",
"__meta_kubernetes_service_port_number": "30900",
"__meta_kubernetes_service_port_protocol": "TCP",
"__meta_kubernetes_service_type": "ClusterIP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_owner": "sre",
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
"__meta_kubernetes_namespace_annotation_version": "v2.1.0",
"__meta_kubernetes_namespace_annotationpresent_version": "true",
"__meta_kubernetes_namespace_annotation_cost_center": "engineering",
"__meta_kubernetes_namespace_annotationpresent_cost_center": "true",
"__meta_kubernetes_namespace_label_environment": "staging",
"__meta_kubernetes_namespace_labelpresent_environment": "true",
"__meta_kubernetes_namespace_label_team": "frontend",
"__meta_kubernetes_namespace_labelpresent_team": "true",
"__meta_kubernetes_namespace_label_region": "us-west",
"__meta_kubernetes_namespace_labelpresent_region": "true",
"__meta_kubernetes_service_name": "testservice",
},
Source: fmt.Sprintf("svc/%s/testservice", ns),
},
},
}.Run(t)
}

View File

@ -1967,8 +1967,11 @@ namespaces:
# Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached.
attach_metadata:
# Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
# When set to true, Prometheus must have permissions to get Nodes.
# When set to true, Prometheus must have permissions to list/watch Nodes.
[ node: <boolean> | default = false ]
# Attaches namespace metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice, service, ingress.
# When set to true, Prometheus must have permissions to list/watch Namespaces.
[ namespace: <boolean> | default = false ]
# HTTP client settings, including authentication methods (such as basic auth and
# authorization), proxy configurations, TLS options, custom HTTP headers, etc.