KAFKA-15186 AppInfo metrics don't contain the client-id (#20493)

All Kafka component register AppInfo metrics to track the application
start time or commit-id etc. These metrics are useful for monitoring and
debugging. However, the AppInfo doesn't provide client-id, which is an
important information for custom metrics reporter.

The AppInfoParser class registers a JMX MBean with the provided
client-id, but when it adds metrics to the Metrics registry, the
client-id is not included. This KIP aims to add the client-id as a tag.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-09-21 16:28:03 +08:00 committed by GitHub
parent 07b786e5bf
commit 01fccd3513
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 110 additions and 34 deletions

View File

@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.Properties;
import javax.management.JMException;
@ -68,7 +69,7 @@ public class AppInfoParser {
AppInfo mBean = new AppInfo(nowMs);
server.registerMBean(mBean, name);
registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter
registerMetrics(metrics, mBean, id); // prefix will be added later by JmxReporter
} catch (JMException e) {
log.warn("Error registering AppInfo mbean", e);
}
@ -81,7 +82,7 @@ public class AppInfoParser {
if (server.isRegistered(name))
server.unregisterMBean(name);
unregisterMetrics(metrics);
unregisterMetrics(metrics, id);
} catch (JMException e) {
log.warn("Error unregistering AppInfo mbean", e);
} finally {
@ -89,23 +90,36 @@ public class AppInfoParser {
}
}
private static MetricName metricName(Metrics metrics, String name) {
return metrics.metricName(name, "app-info", "Metric indicating " + name);
private static MetricName metricName(Metrics metrics, String name, Map<String, String> tags) {
return metrics.metricName(name, "app-info", "Metric indicating " + name, tags);
}
private static void registerMetrics(Metrics metrics, AppInfo appInfo) {
if (metrics != null) {
metrics.addMetric(metricName(metrics, "version"), (Gauge<String>) (config, now) -> appInfo.getVersion());
metrics.addMetric(metricName(metrics, "commit-id"), (Gauge<String>) (config, now) -> appInfo.getCommitId());
metrics.addMetric(metricName(metrics, "start-time-ms"), (Gauge<Long>) (config, now) -> appInfo.getStartTimeMs());
private static void registerMetrics(Metrics metrics, AppInfo appInfo, String clientId) {
if (metrics == null) return;
// Most Kafka clients (producer/consumer/admin) set the client-id tag in the metrics config.
// Although we dont explicitly parse client-id here, these metrics are automatically tagged with client-id.
metrics.addMetric(metricName(metrics, "version", Map.of()), (Gauge<String>) (config, now) -> appInfo.getVersion());
metrics.addMetric(metricName(metrics, "commit-id", Map.of()), (Gauge<String>) (config, now) -> appInfo.getCommitId());
metrics.addMetric(metricName(metrics, "start-time-ms", Map.of()), (Gauge<Long>) (config, now) -> appInfo.getStartTimeMs());
// MirrorMaker/Worker doesn't set client-id tag into the metrics config, so we need to set it here.
if (!metrics.config().tags().containsKey("client-id") && clientId != null) {
metrics.addMetric(metricName(metrics, "version", Map.of("client-id", clientId)), (Gauge<String>) (config, now) -> appInfo.getVersion());
metrics.addMetric(metricName(metrics, "commit-id", Map.of("client-id", clientId)), (Gauge<String>) (config, now) -> appInfo.getCommitId());
metrics.addMetric(metricName(metrics, "start-time-ms", Map.of("client-id", clientId)), (Gauge<Long>) (config, now) -> appInfo.getStartTimeMs());
}
}
private static void unregisterMetrics(Metrics metrics) {
if (metrics != null) {
metrics.removeMetric(metricName(metrics, "version"));
metrics.removeMetric(metricName(metrics, "commit-id"));
metrics.removeMetric(metricName(metrics, "start-time-ms"));
private static void unregisterMetrics(Metrics metrics, String clientId) {
if (metrics == null) return;
metrics.removeMetric(metricName(metrics, "version", Map.of()));
metrics.removeMetric(metricName(metrics, "commit-id", Map.of()));
metrics.removeMetric(metricName(metrics, "start-time-ms", Map.of()));
if (!metrics.config().tags().containsKey("client-id") && clientId != null) {
metrics.removeMetric(metricName(metrics, "version", Map.of("client-id", clientId)));
metrics.removeMetric(metricName(metrics, "commit-id", Map.of("client-id", clientId)));
metrics.removeMetric(metricName(metrics, "start-time-ms", Map.of("client-id", clientId)));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.api.AfterEach;
@ -23,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.lang.management.ManagementFactory;
import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanServer;
@ -41,38 +43,49 @@ public class AppInfoParserTest {
private static final String METRICS_PREFIX = "app-info-test";
private static final String METRICS_ID = "test";
private Metrics metrics;
private MBeanServer mBeanServer;
@BeforeEach
public void setUp() {
metrics = new Metrics(new MockTime(1));
mBeanServer = ManagementFactory.getPlatformMBeanServer();
}
@AfterEach
public void tearDown() {
metrics.close();
public void tearDown() throws JMException {
if (mBeanServer.isRegistered(expectedAppObjectName())) {
mBeanServer.unregisterMBean(expectedAppObjectName());
}
}
@Test
public void testRegisterAppInfoRegistersMetrics() throws JMException {
registerAppInfo();
registerAppInfoMultipleTimes();
try (Metrics metrics = new Metrics(new MockTime(1))) {
registerAppInfo(metrics);
registerAppInfoMultipleTimes(metrics);
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
}
}
@Test
public void testUnregisterAppInfoUnregistersMetrics() throws JMException {
registerAppInfo();
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
try (Metrics metrics = new Metrics(new MockTime(1))) {
registerAppInfo(metrics);
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
assertFalse(mBeanServer.isRegistered(expectedAppObjectName()));
assertNull(metrics.metric(metrics.metricName("commit-id", "app-info")));
assertNull(metrics.metric(metrics.metricName("version", "app-info")));
assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info")));
assertFalse(mBeanServer.isRegistered(expectedAppObjectName()));
assertNull(metrics.metric(metrics.metricName("commit-id", "app-info")));
assertNull(metrics.metric(metrics.metricName("version", "app-info")));
assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info")));
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
assertNull(metrics.metric(metrics.metricName("commit-id", "app-info", idTag)));
assertNull(metrics.metric(metrics.metricName("version", "app-info", idTag)));
assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)));
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
}
}
private void registerAppInfo() throws JMException {
private void registerAppInfo(Metrics metrics) throws JMException {
assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId());
assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion());
@ -82,9 +95,15 @@ public class AppInfoParserTest {
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue());
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue());
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue());
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
assertTrue(mBeanServer.isRegistered(expectedAppObjectName()));
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue());
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue());
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue());
}
private void registerAppInfoMultipleTimes() throws JMException {
private void registerAppInfoMultipleTimes(Metrics metrics) throws JMException {
assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId());
assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion());
@ -95,9 +114,37 @@ public class AppInfoParserTest {
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue());
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue());
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue());
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue());
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue());
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue());
}
private ObjectName expectedAppObjectName() throws MalformedObjectNameException {
return new ObjectName(METRICS_PREFIX + ":type=app-info,id=" + METRICS_ID);
}
@Test
public void testClientIdWontAddRepeatedly() throws JMException {
Map<String, String> tags = Map.of(
"client-id", METRICS_ID,
"other-tag", "tag-value",
"another-tag", "another-value"
);
Metrics metrics = new Metrics(new MetricConfig().tags(tags), new MockTime(1));
AppInfoParser.registerAppInfo(METRICS_PREFIX, METRICS_ID, metrics, EXPECTED_START_MS);
assertTrue(mBeanServer.isRegistered(expectedAppObjectName()));
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", tags)).metricValue());
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", tags)).metricValue());
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", tags)).metricValue());
Map<String, String> idTag = Map.of("client-id", METRICS_ID);
assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info", idTag)).metricValue());
assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info", idTag)).metricValue());
assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info", idTag)).metricValue());
metrics.close();
AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics);
}
}

View File

@ -147,6 +147,21 @@
and defaults to false.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/nyH1D">KIP-853</a>.
</li>
<li>
The AppInfo metrics will deprecate the following metric names, which will be removed in Kafka 5.0:
<ul>
<li><code>[name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={}]</code></li>
<li><code>[name=commit-id, group=app-info, description=Metric indicating commit-id, tags={}]</code></li>
<li><code>[name=version, group=app-info, description=Metric indicating version, tags={}]</code></li>
</ul>
In addition, the <code>client-id</code> will be added to the tags of these metrics. The new metric names will be:
<ul>
<li><code>[name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=...}]</code></li>
<li><code>[name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=...}]</code></li>
<li><code>[name=version, group=app-info, description=Metric indicating version, tags={client-id=...}]</code></li>
</ul>
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/3gn0Ew">KIP-1120</a>.
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
@ -423,12 +438,12 @@
<li>The <code>--whitelist</code> option was removed from the <code>kafka-console-consumer</code> command line tool.
Please use <code>--include</code> instead.
</li>
<li>Redirections from the old tools packages have been removed:
<code>kafka.admin.FeatureCommand</code>,
<code>kafka.tools.ClusterTool</code>,
<li>Redirections from the old tools packages have been removed:
<code>kafka.admin.FeatureCommand</code>,
<code>kafka.tools.ClusterTool</code>,
<code>kafka.tools.EndToEndLatency</code>,
<code>kafka.tools.StateChangeLogMerger</code>,
<code>kafka.tools.StreamsResetter</code>,
<code>kafka.tools.StateChangeLogMerger</code>,
<code>kafka.tools.StreamsResetter</code>,
<code>kafka.tools.JmxTool</code>.
</li>
<li>The <code>--authorizer</code>, <code>--authorizer-properties</code>, and <code>--zk-tls-config-file</code> options were removed from the <code>kafka-acls</code> command line tool.
@ -492,7 +507,7 @@
</li>
<li>The deprecated <code>sendOffsetsToTransaction(Map&lt;TopicPartition, OffsetAndMetadata&gt;, String)</code> method has been removed from the Producer API.
</li>
<li>The default <code>linger.ms</code> changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in
<li>The default <code>linger.ms</code> changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in
similar or lower producer latency despite the increased linger.
</li>
</ul>