From 01fccd35138b407d87828ceb557d33c4b5450cac Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Sun, 21 Sep 2025 16:28:03 +0800 Subject: [PATCH] KAFKA-15186 AppInfo metrics don't contain the client-id (#20493) All Kafka component register AppInfo metrics to track the application start time or commit-id etc. These metrics are useful for monitoring and debugging. However, the AppInfo doesn't provide client-id, which is an important information for custom metrics reporter. The AppInfoParser class registers a JMX MBean with the provided client-id, but when it adds metrics to the Metrics registry, the client-id is not included. This KIP aims to add the client-id as a tag. Reviewers: Chia-Ping Tsai --- .../kafka/common/utils/AppInfoParser.java | 42 +++++++---- .../kafka/common/utils/AppInfoParserTest.java | 75 +++++++++++++++---- docs/upgrade.html | 27 +++++-- 3 files changed, 110 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java index e23bda36d5f..cc2d7b75f1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; import java.lang.management.ManagementFactory; +import java.util.Map; import java.util.Properties; import javax.management.JMException; @@ -68,7 +69,7 @@ public class AppInfoParser { AppInfo mBean = new AppInfo(nowMs); server.registerMBean(mBean, name); - registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter + registerMetrics(metrics, mBean, id); // prefix will be added later by JmxReporter } catch (JMException e) { log.warn("Error registering AppInfo mbean", e); } @@ -81,7 +82,7 @@ public class AppInfoParser { if (server.isRegistered(name)) server.unregisterMBean(name); - unregisterMetrics(metrics); + unregisterMetrics(metrics, id); } catch (JMException e) { log.warn("Error unregistering AppInfo mbean", e); } finally { @@ -89,23 +90,36 @@ public class AppInfoParser { } } - private static MetricName metricName(Metrics metrics, String name) { - return metrics.metricName(name, "app-info", "Metric indicating " + name); + private static MetricName metricName(Metrics metrics, String name, Map tags) { + return metrics.metricName(name, "app-info", "Metric indicating " + name, tags); } - private static void registerMetrics(Metrics metrics, AppInfo appInfo) { - if (metrics != null) { - metrics.addMetric(metricName(metrics, "version"), (Gauge) (config, now) -> appInfo.getVersion()); - metrics.addMetric(metricName(metrics, "commit-id"), (Gauge) (config, now) -> appInfo.getCommitId()); - metrics.addMetric(metricName(metrics, "start-time-ms"), (Gauge) (config, now) -> appInfo.getStartTimeMs()); + private static void registerMetrics(Metrics metrics, AppInfo appInfo, String clientId) { + if (metrics == null) return; + // Most Kafka clients (producer/consumer/admin) set the client-id tag in the metrics config. + // Although we don’t explicitly parse client-id here, these metrics are automatically tagged with client-id. + metrics.addMetric(metricName(metrics, "version", Map.of()), (Gauge) (config, now) -> appInfo.getVersion()); + metrics.addMetric(metricName(metrics, "commit-id", Map.of()), (Gauge) (config, now) -> appInfo.getCommitId()); + metrics.addMetric(metricName(metrics, "start-time-ms", Map.of()), (Gauge) (config, now) -> appInfo.getStartTimeMs()); + // MirrorMaker/Worker doesn't set client-id tag into the metrics config, so we need to set it here. + if (!metrics.config().tags().containsKey("client-id") && clientId != null) { + metrics.addMetric(metricName(metrics, "version", Map.of("client-id", clientId)), (Gauge) (config, now) -> appInfo.getVersion()); + metrics.addMetric(metricName(metrics, "commit-id", Map.of("client-id", clientId)), (Gauge) (config, now) -> appInfo.getCommitId()); + metrics.addMetric(metricName(metrics, "start-time-ms", Map.of("client-id", clientId)), (Gauge) (config, now) -> appInfo.getStartTimeMs()); } } - private static void unregisterMetrics(Metrics metrics) { - if (metrics != null) { - metrics.removeMetric(metricName(metrics, "version")); - metrics.removeMetric(metricName(metrics, "commit-id")); - metrics.removeMetric(metricName(metrics, "start-time-ms")); + private static void unregisterMetrics(Metrics metrics, String clientId) { + if (metrics == null) return; + + metrics.removeMetric(metricName(metrics, "version", Map.of())); + metrics.removeMetric(metricName(metrics, "commit-id", Map.of())); + metrics.removeMetric(metricName(metrics, "start-time-ms", Map.of())); + + if (!metrics.config().tags().containsKey("client-id") && clientId != null) { + metrics.removeMetric(metricName(metrics, "version", Map.of("client-id", clientId))); + metrics.removeMetric(metricName(metrics, "commit-id", Map.of("client-id", clientId))); + metrics.removeMetric(metricName(metrics, "start-time-ms", Map.of("client-id", clientId))); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java index aac13f299fe..7e153be5862 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.junit.jupiter.api.AfterEach; @@ -23,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.lang.management.ManagementFactory; +import java.util.Map; import javax.management.JMException; import javax.management.MBeanServer; @@ -41,38 +43,49 @@ public class AppInfoParserTest { private static final String METRICS_PREFIX = "app-info-test"; private static final String METRICS_ID = "test"; - private Metrics metrics; private MBeanServer mBeanServer; @BeforeEach public void setUp() { - metrics = new Metrics(new MockTime(1)); mBeanServer = ManagementFactory.getPlatformMBeanServer(); } @AfterEach - public void tearDown() { - metrics.close(); + public void tearDown() throws JMException { + if (mBeanServer.isRegistered(expectedAppObjectName())) { + mBeanServer.unregisterMBean(expectedAppObjectName()); + } } @Test public void testRegisterAppInfoRegistersMetrics() throws JMException { - registerAppInfo(); - registerAppInfoMultipleTimes(); + try (Metrics metrics = new Metrics(new MockTime(1))) { + registerAppInfo(metrics); + registerAppInfoMultipleTimes(metrics); + AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics); + } } @Test public void testUnregisterAppInfoUnregistersMetrics() throws JMException { - registerAppInfo(); - AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics); + try (Metrics metrics = new Metrics(new MockTime(1))) { + registerAppInfo(metrics); + AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics); - assertFalse(mBeanServer.isRegistered(expectedAppObjectName())); - assertNull(metrics.metric(metrics.metricName("commit-id", "app-info"))); - assertNull(metrics.metric(metrics.metricName("version", "app-info"))); - assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info"))); + assertFalse(mBeanServer.isRegistered(expectedAppObjectName())); + assertNull(metrics.metric(metrics.metricName("commit-id", "app-info"))); + assertNull(metrics.metric(metrics.metricName("version", "app-info"))); + assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info"))); + + Map idTag = Map.of("client-id", METRICS_ID); + assertNull(metrics.metric(metrics.metricName("commit-id", "app-info", idTag))); + assertNull(metrics.metric(metrics.metricName("version", "app-info", idTag))); + assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag))); + AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics); + } } - private void registerAppInfo() throws JMException { + private void registerAppInfo(Metrics metrics) throws JMException { assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId()); assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion()); @@ -82,9 +95,15 @@ public class AppInfoParserTest { assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue()); assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue()); assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue()); + + Map idTag = Map.of("client-id", METRICS_ID); + assertTrue(mBeanServer.isRegistered(expectedAppObjectName())); + assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue()); + assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue()); + assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue()); } - private void registerAppInfoMultipleTimes() throws JMException { + private void registerAppInfoMultipleTimes(Metrics metrics) throws JMException { assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId()); assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion()); @@ -95,9 +114,37 @@ public class AppInfoParserTest { assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue()); assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue()); assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue()); + + Map idTag = Map.of("client-id", METRICS_ID); + assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue()); + assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue()); + assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue()); } private ObjectName expectedAppObjectName() throws MalformedObjectNameException { return new ObjectName(METRICS_PREFIX + ":type=app-info,id=" + METRICS_ID); } + + @Test + public void testClientIdWontAddRepeatedly() throws JMException { + Map tags = Map.of( + "client-id", METRICS_ID, + "other-tag", "tag-value", + "another-tag", "another-value" + ); + Metrics metrics = new Metrics(new MetricConfig().tags(tags), new MockTime(1)); + AppInfoParser.registerAppInfo(METRICS_PREFIX, METRICS_ID, metrics, EXPECTED_START_MS); + + assertTrue(mBeanServer.isRegistered(expectedAppObjectName())); + assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", tags)).metricValue()); + assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", tags)).metricValue()); + assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", tags)).metricValue()); + + Map idTag = Map.of("client-id", METRICS_ID); + assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue()); + assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue()); + assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue()); + metrics.close(); + AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics); + } } diff --git a/docs/upgrade.html b/docs/upgrade.html index ec417e88c6b..66e05d90a5d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -147,6 +147,21 @@ and defaults to false. For further details, please refer to KIP-853. +
  • + The AppInfo metrics will deprecate the following metric names, which will be removed in Kafka 5.0: +
      +
    • [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={}]
    • +
    • [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={}]
    • +
    • [name=version, group=app-info, description=Metric indicating version, tags={}]
    • +
    + In addition, the client-id will be added to the tags of these metrics. The new metric names will be: +
      +
    • [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=...}]
    • +
    • [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=...}]
    • +
    • [name=version, group=app-info, description=Metric indicating version, tags={client-id=...}]
    • +
    + For further details, please refer to KIP-1120. +
  • Upgrading to 4.1.0

    @@ -423,12 +438,12 @@
  • The --whitelist option was removed from the kafka-console-consumer command line tool. Please use --include instead.
  • -
  • Redirections from the old tools packages have been removed: - kafka.admin.FeatureCommand, - kafka.tools.ClusterTool, +
  • Redirections from the old tools packages have been removed: + kafka.admin.FeatureCommand, + kafka.tools.ClusterTool, kafka.tools.EndToEndLatency, - kafka.tools.StateChangeLogMerger, - kafka.tools.StreamsResetter, + kafka.tools.StateChangeLogMerger, + kafka.tools.StreamsResetter, kafka.tools.JmxTool.
  • The --authorizer, --authorizer-properties, and --zk-tls-config-file options were removed from the kafka-acls command line tool. @@ -492,7 +507,7 @@
  • The deprecated sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>, String) method has been removed from the Producer API.
  • -
  • The default linger.ms changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in +
  • The default linger.ms changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in similar or lower producer latency despite the increased linger.