diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml
index f8adb5808b1..b3d1b928cc6 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -94,6 +94,7 @@
+
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index 69d2353fb83..03f1c9b929e 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
-import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics}
+import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics}
import java.net.InetSocketAddress
import java.util.Arrays
@@ -116,6 +116,7 @@ class SharedServer(
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
@volatile var controllerServerMetrics: ControllerMetadataMetrics = _
+ @volatile var nodeMetrics: NodeMetrics = _
@volatile var loader: MetadataLoader = _
private val snapshotsDisabledReason = new AtomicReference[String](null)
@volatile var snapshotEmitter: SnapshotEmitter = _
@@ -298,6 +299,7 @@ class SharedServer(
raftManager = _raftManager
_raftManager.startup()
+ nodeMetrics = new NodeMetrics(metrics, controllerConfig.unstableFeatureVersionsEnabled)
metadataLoaderMetrics = if (brokerMetrics != null) {
new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
@@ -387,6 +389,8 @@ class SharedServer(
controllerServerMetrics = null
Utils.closeQuietly(brokerMetrics, "broker metrics")
brokerMetrics = null
+ Utils.closeQuietly(nodeMetrics, "node metrics")
+ nodeMetrics = null
Utils.closeQuietly(metrics, "metrics")
metrics = null
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this)
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 4fabd1863b6..a1513a0c4c0 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -17,6 +17,8 @@
package org.apache.kafka.image.loader;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
@@ -30,9 +32,12 @@ import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.SnapshotReader;
@@ -345,7 +350,24 @@ public class MetadataLoader implements RaftClient.Listener
}
}
metrics.updateLastAppliedImageProvenance(image.provenance());
- metrics.setCurrentMetadataVersion(image.features().metadataVersionOrThrow());
+ MetadataVersion metadataVersion = image.features().metadataVersionOrThrow();
+ metrics.setCurrentMetadataVersion(metadataVersion);
+
+ // Set the metadata version feature level, since it is handled separately from other features
+ metrics.recordFinalizedFeatureLevel(
+ MetadataVersion.FEATURE_NAME,
+ metadataVersion.featureLevel()
+ );
+
+ // Set all production feature levels from the image
+ metrics.maybeRemoveFinalizedFeatureLevelMetrics(image.features().finalizedVersions());
+ for (var featureEntry : image.features().finalizedVersions().entrySet()) {
+ metrics.recordFinalizedFeatureLevel(
+ featureEntry.getKey(),
+ featureEntry.getValue()
+ );
+ }
+
if (!uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
@@ -357,6 +379,7 @@ public class MetadataLoader implements RaftClient.Listener
try (reader) {
while (reader.hasNext()) {
Batch batch = reader.next();
+ loadControlRecords(batch);
long elapsedNs = batchLoader.loadBatch(batch, currentLeaderAndEpoch);
metrics.updateBatchSize(batch.records().size());
metrics.updateBatchProcessingTimeNs(elapsedNs);
@@ -418,6 +441,7 @@ public class MetadataLoader implements RaftClient.Listener
int snapshotIndex = 0;
while (reader.hasNext()) {
Batch batch = reader.next();
+ loadControlRecords(batch);
for (ApiMessageAndVersion record : batch.records()) {
try {
delta.replay(record.message());
@@ -435,6 +459,15 @@ public class MetadataLoader implements RaftClient.Listener
time.nanoseconds() - startNs);
}
+ void loadControlRecords(Batch batch) {
+ for (ControlRecord controlRecord : batch.controlRecords()) {
+ if (controlRecord.type() == ControlRecordType.KRAFT_VERSION) {
+ final var kRaftVersionRecord = (KRaftVersionRecord) controlRecord.message();
+ metrics.recordFinalizedFeatureLevel(KRaftVersion.FEATURE_NAME, kRaftVersionRecord.kRaftVersion());
+ }
+ }
+ }
+
@Override
public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
eventQueue.append(() -> {
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
index 6fc346ed10a..a819e9230a5 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image.loader.metrics;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
@@ -25,8 +26,12 @@ import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,12 +45,15 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
"MetadataLoader", "CurrentMetadataVersion");
private static final MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName(
"MetadataLoader", "HandleLoadSnapshotCount");
- public static final MetricName CURRENT_CONTROLLER_ID = getMetricName(
+ private static final MetricName CURRENT_CONTROLLER_ID = getMetricName(
"MetadataLoader", "CurrentControllerId");
+ private static final String FINALIZED_LEVEL_METRIC_NAME = "FinalizedLevel";
+ private static final String FEATURE_NAME_TAG = "featureName";
private final Optional registry;
private final AtomicReference currentMetadataVersion =
new AtomicReference<>(MetadataVersion.MINIMUM_VERSION);
+ private final Map finalizedFeatureLevels = new ConcurrentHashMap<>();
private final AtomicInteger currentControllerId = new AtomicInteger(-1);
private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0);
private final Consumer batchProcessingTimeNsUpdater;
@@ -90,6 +98,32 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
}));
}
+ private void addFinalizedFeatureLevelMetric(String featureName) {
+ registry.ifPresent(r -> r.newGauge(
+ getFeatureNameTagMetricName(
+ "MetadataLoader",
+ FINALIZED_LEVEL_METRIC_NAME,
+ featureName
+ ),
+ new Gauge() {
+ @Override
+ public Short value() {
+ return finalizedFeatureLevel(featureName);
+ }
+ }
+ ));
+ }
+
+ private void removeFinalizedFeatureLevelMetric(String featureName) {
+ registry.ifPresent(r -> r.removeMetric(
+ getFeatureNameTagMetricName(
+ "MetadataLoader",
+ FINALIZED_LEVEL_METRIC_NAME,
+ featureName
+ )
+ ));
+ }
+
/**
* Update the batch processing time histogram.
*/
@@ -142,6 +176,48 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
return this.handleLoadSnapshotCount.get();
}
+ /**
+ * Remove the FinalizedLevel metric for features who are no longer part of the
+ * current features image.
+ * Note that metadata.version and kraft.version are not included in
+ * the features image, so they are not removed.
+ * @param newFinalizedLevels The new finalized feature levels from the features image
+ */
+ public void maybeRemoveFinalizedFeatureLevelMetrics(Map newFinalizedLevels) {
+ final var iter = finalizedFeatureLevels.keySet().iterator();
+ while (iter.hasNext()) {
+ final var featureName = iter.next();
+ if (newFinalizedLevels.containsKey(featureName) ||
+ featureName.equals(MetadataVersion.FEATURE_NAME) ||
+ featureName.equals(KRaftVersion.FEATURE_NAME)) {
+ continue;
+ }
+ removeFinalizedFeatureLevelMetric(featureName);
+ iter.remove();
+ }
+ }
+
+ /**
+ * Record the finalized feature level and ensure the metric is registered.
+ *
+ * @param featureName The name of the feature
+ * @param featureLevel The finalized level for the feature
+ */
+ public void recordFinalizedFeatureLevel(String featureName, short featureLevel) {
+ final var metricNotRegistered = finalizedFeatureLevels.put(featureName, featureLevel) == null;
+ if (metricNotRegistered) addFinalizedFeatureLevelMetric(featureName);
+ }
+
+ /**
+ * Get the finalized feature level for a feature.
+ *
+ * @param featureName The name of the feature
+ * @return The finalized level for the feature
+ */
+ public short finalizedFeatureLevel(String featureName) {
+ return finalizedFeatureLevels.get(featureName);
+ }
+
@Override
public void close() {
registry.ifPresent(r -> List.of(
@@ -149,9 +225,37 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
CURRENT_CONTROLLER_ID,
HANDLE_LOAD_SNAPSHOT_COUNT
).forEach(r::removeMetric));
+ for (var featureName : finalizedFeatureLevels.keySet()) {
+ removeFinalizedFeatureLevelMetric(featureName);
+ }
}
private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
}
+
+ private static MetricName getFeatureNameTagMetricName(String type, String name, String featureName) {
+ LinkedHashMap featureNameTag = new LinkedHashMap<>();
+ featureNameTag.put(FEATURE_NAME_TAG, sanitizeFeatureName(featureName));
+ return KafkaYammerMetrics.getMetricName("kafka.server", type, name, featureNameTag);
+ }
+
+ /**
+ * Sanitize the feature name to be used as a tag in metrics by
+ * converting from dot notation to camel case.
+ * The conversion is done to be consistent with other Yammer metrics.
+ *
+ * @param featureName The feature name in dot notation.
+ * @return The sanitized feature name in camel case.
+ */
+ private static String sanitizeFeatureName(String featureName) {
+ final var words = featureName.split("\\.");
+ final var builder = new StringBuilder(words[0]);
+ for (int i = 1; i < words.length; i++) {
+ final var word = words[i];
+ builder.append(Character.toUpperCase(word.charAt(0)))
+ .append(word.substring(1).toLowerCase(Locale.ROOT));
+ }
+ return builder.toString();
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 31960252bd8..b6069c5de75 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
@@ -36,6 +37,7 @@ import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.MockFaultHandler;
@@ -59,6 +61,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -355,6 +358,8 @@ public class MetadataLoaderTest {
publishers.get(0).latestSnapshotManifest);
assertEquals(MINIMUM_VERSION,
loader.metrics().currentMetadataVersion());
+ assertEquals(MINIMUM_VERSION.featureLevel(),
+ loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
}
assertTrue(publishers.get(0).closed);
assertEquals(Optional.of(MINIMUM_VERSION), publishers.get(0).latestImage.features().metadataVersion());
@@ -508,6 +513,65 @@ public class MetadataLoaderTest {
faultHandler.maybeRethrowFirstException();
}
+ /**
+ * Test the kraft.version finalized level value is correct.
+ * @throws Exception
+ */
+ @Test
+ public void testKRaftVersionFinalizedLevelMetric() throws Exception {
+ MockFaultHandler faultHandler = new MockFaultHandler("testKRaftVersionFinalizedLevelMetric");
+ MockTime time = new MockTime();
+ List publishers = List.of(new MockPublisher());
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setTime(time).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
+ build()) {
+ loader.installPublishers(publishers).get();
+ loadTestSnapshot(loader, 200);
+ assertThrows(
+ NullPointerException.class,
+ () -> loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME)
+ );
+ publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
+ MockBatchReader batchReader = new MockBatchReader(
+ 300,
+ List.of(
+ Batch.control(
+ 300,
+ 100,
+ 4000,
+ 10,
+ List.of(ControlRecord.of(new KRaftVersionRecord()))
+ )
+ )
+ ).setTime(time);
+ loader.handleCommit(batchReader);
+ loader.waitForAllEventsToBeHandled();
+ assertTrue(batchReader.closed);
+ assertEquals(300L, loader.lastAppliedOffset());
+ assertEquals((short) 0, loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME));
+ loader.handleCommit(new MockBatchReader(301, List.of(
+ MockBatchReader.newBatch(301, 100, List.of(
+ new ApiMessageAndVersion(new RemoveTopicRecord().
+ setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))))));
+ loader.waitForAllEventsToBeHandled();
+ assertEquals(301L, loader.lastAppliedOffset());
+ assertEquals((short) 0, loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME));
+ loader.handleCommit(new MockBatchReader(302, List.of(
+ Batch.control(
+ 302,
+ 100,
+ 4000,
+ 10,
+ List.of(ControlRecord.of(new KRaftVersionRecord().setKRaftVersion((short) 1)))))));
+ loader.waitForAllEventsToBeHandled();
+ assertEquals(302L, loader.lastAppliedOffset());
+ assertEquals((short) 1, loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME));
+ }
+ assertTrue(publishers.get(0).closed);
+ }
+
/**
* Test that the lastAppliedOffset moves forward as expected.
*/
@@ -640,12 +704,16 @@ public class MetadataLoaderTest {
assertEquals(200L, loader.lastAppliedOffset());
assertEquals(MINIMUM_VERSION.featureLevel(),
loader.metrics().currentMetadataVersion().featureLevel());
+ assertEquals(MINIMUM_VERSION.featureLevel(),
+ loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
assertFalse(publishers.get(0).latestDelta.image().isEmpty());
loadTestSnapshot2(loader, 400);
assertEquals(400L, loader.lastAppliedOffset());
assertEquals(MetadataVersion.latestProduction().featureLevel(),
loader.metrics().currentMetadataVersion().featureLevel());
+ assertEquals(MetadataVersion.latestProduction().featureLevel(),
+ loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
// Make sure the topic in the initial snapshot was overwritten by loading the new snapshot.
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
@@ -659,6 +727,8 @@ public class MetadataLoaderTest {
loader.waitForAllEventsToBeHandled();
assertEquals(IBP_3_5_IV0.featureLevel(),
loader.metrics().currentMetadataVersion().featureLevel());
+ assertEquals(IBP_3_5_IV0.featureLevel(),
+ loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
}
faultHandler.maybeRethrowFirstException();
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
index e200b6ce551..3acf3be23dd 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.image.loader.metrics;
import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.TransactionVersion;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
@@ -26,16 +29,15 @@ import com.yammer.metrics.core.MetricsRegistry;
import org.junit.jupiter.api.Test;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
-
public class MetadataLoaderMetricsTest {
private static class FakeMetadataLoaderMetrics implements AutoCloseable {
final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
@@ -72,7 +74,22 @@ public class MetadataLoaderMetricsTest {
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
- ));
+ )
+ );
+
+ // Record some feature levels and verify their metrics are registered
+ fakeMetrics.metrics.recordFinalizedFeatureLevel("metadata.version", (short) 3);
+ fakeMetrics.metrics.recordFinalizedFeatureLevel("kraft.version", (short) 4);
+
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Set.of(
+ "kafka.server:type=MetadataLoader,name=CurrentControllerId",
+ "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+ "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion"
+ )
+ );
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of());
@@ -114,7 +131,7 @@ public class MetadataLoaderMetricsTest {
MetricsRegistry registry = new MetricsRegistry();
try {
try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
- fakeMetrics.metrics.setCurrentMetadataVersion(MINIMUM_VERSION);
+ fakeMetrics.metrics.setCurrentMetadataVersion(MetadataVersion.IBP_3_7_IV0);
fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
@@ -122,7 +139,7 @@ public class MetadataLoaderMetricsTest {
Gauge currentMetadataVersion = (Gauge) registry
.allMetrics()
.get(metricName("MetadataLoader", "CurrentMetadataVersion"));
- assertEquals(MINIMUM_VERSION.featureLevel(),
+ assertEquals(MetadataVersion.IBP_3_7_IV0.featureLevel(),
currentMetadataVersion.value().shortValue());
@SuppressWarnings("unchecked")
@@ -153,8 +170,100 @@ public class MetadataLoaderMetricsTest {
}
}
+ @Test
+ public void testFinalizedFeatureLevelMetrics() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try {
+ try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+ // Initially no finalized level metrics should be registered
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Set.of(
+ "kafka.server:type=MetadataLoader,name=CurrentControllerId",
+ "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+ "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+ )
+ );
+
+ // Record metadata version and verify its metric
+ fakeMetrics.metrics.recordFinalizedFeatureLevel(MetadataVersion.FEATURE_NAME, (short) 5);
+ @SuppressWarnings("unchecked")
+ Gauge finalizedMetadataVersion = (Gauge) registry
+ .allMetrics()
+ .get(metricName("MetadataLoader", "FinalizedLevel", "featureName=metadataVersion"));
+ assertEquals((short) 5, finalizedMetadataVersion.value());
+
+ // Record KRaft version and verify its metric
+ fakeMetrics.metrics.recordFinalizedFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
+ @SuppressWarnings("unchecked")
+ Gauge finalizedKRaftVersion = (Gauge) registry
+ .allMetrics()
+ .get(metricName("MetadataLoader", "FinalizedLevel", "featureName=kraftVersion"));
+ assertEquals((short) 1, finalizedKRaftVersion.value());
+
+ // Record transaction version and verify its metric
+ fakeMetrics.metrics.recordFinalizedFeatureLevel(TransactionVersion.FEATURE_NAME, (short) 1);
+ @SuppressWarnings("unchecked")
+ Gauge finalizedTransactionVersion = (Gauge) registry
+ .allMetrics()
+ .get(metricName("MetadataLoader", "FinalizedLevel", "featureName=transactionVersion"));
+ assertEquals((short) 1, finalizedTransactionVersion.value());
+
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Set.of(
+ "kafka.server:type=MetadataLoader,name=CurrentControllerId",
+ "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+ "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
+ )
+ );
+
+ // When a feature's finalized level is not present in the new image, its metric should be removed
+ // This does not apply to metadataVersion and kraftVersion
+ fakeMetrics.metrics.maybeRemoveFinalizedFeatureLevelMetrics(Map.of());
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Set.of(
+ "kafka.server:type=MetadataLoader,name=CurrentControllerId",
+ "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+ "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion"
+ )
+ );
+
+ // Set the finalized feature level and check the metric is added back with its correct value
+ fakeMetrics.metrics.recordFinalizedFeatureLevel(TransactionVersion.FEATURE_NAME, (short) 2);
+ @SuppressWarnings("unchecked")
+ Gauge finalizedTransactionVersion2 = (Gauge) registry
+ .allMetrics()
+ .get(metricName("MetadataLoader", "FinalizedLevel", "featureName=transactionVersion"));
+ assertEquals((short) 2, finalizedTransactionVersion2.value());
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Set.of(
+ "kafka.server:type=MetadataLoader,name=CurrentControllerId",
+ "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+ "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
+ "kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
+ )
+ );
+ }
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Set.of());
+ } finally {
+ registry.shutdown();
+ }
+ }
+
private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name);
return new MetricName("kafka.server", type, name, null, mBeanName);
}
+
+ private static MetricName metricName(String type, String name, String scope) {
+ String mBeanName = String.format("kafka.server:type=%s,name=%s,%s", type, name, scope);
+ return new MetricName("kafka.server", type, name, scope, mBeanName);
+ }
}
diff --git a/server/src/main/java/org/apache/kafka/server/metrics/NodeMetrics.java b/server/src/main/java/org/apache/kafka/server/metrics/NodeMetrics.java
new file mode 100644
index 00000000000..8d7b3bb68e2
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/metrics/NodeMetrics.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.controller.QuorumFeatures;
+import org.apache.kafka.metadata.VersionRange;
+
+import java.util.Map;
+
+public final class NodeMetrics implements AutoCloseable {
+ private static final String METRIC_GROUP_NAME = "node-metrics";
+ private static final String FEATURE_NAME_TAG = "feature-name";
+ private static final String MAXIMUM_SUPPORTED_LEVEL_NAME = "maximum-supported-level";
+ private static final String MINIMUM_SUPPORTED_LEVEL_NAME = "minimum-supported-level";
+
+ private final Metrics metrics;
+ private final Map supportedFeatureRanges;
+
+ public NodeMetrics(Metrics metrics, boolean enableUnstableVersions) {
+ this.metrics = metrics;
+ this.supportedFeatureRanges = QuorumFeatures.defaultSupportedFeatureMap(enableUnstableVersions);
+ supportedFeatureRanges.forEach((featureName, versionRange) -> {
+ addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.max());
+ addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.min());
+ });
+ }
+
+ private void addSupportedLevelMetric(String metricName, String featureName, short value) {
+ metrics.addMetric(
+ getFeatureNameTagMetricName(
+ metricName,
+ METRIC_GROUP_NAME,
+ featureName
+ ),
+ (Gauge) (config, now) -> value
+ );
+ }
+
+ @Override
+ public void close() {
+ for (var featureName : supportedFeatureRanges.keySet()) {
+ metrics.removeMetric(
+ getFeatureNameTagMetricName(
+ MAXIMUM_SUPPORTED_LEVEL_NAME,
+ METRIC_GROUP_NAME,
+ featureName
+ )
+ );
+ metrics.removeMetric(
+ getFeatureNameTagMetricName(
+ MINIMUM_SUPPORTED_LEVEL_NAME,
+ METRIC_GROUP_NAME,
+ featureName
+ )
+ );
+ }
+ }
+
+ private MetricName getFeatureNameTagMetricName(String name, String group, String featureName) {
+ return metrics.metricName(
+ name,
+ group,
+ Map.of(FEATURE_NAME_TAG, featureName.replace(".", "-"))
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
index 9778e015465..e8b1bb71cb5 100644
--- a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
+++ b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
@@ -38,7 +38,7 @@ public final class BrokerServerMetricsTest {
Metrics metrics = new Metrics();
String expectedGroup = "broker-metadata-metrics";
- // Metric description is not use for metric name equality
+ // Metric description is not used for metric name equality
Set expectedMetrics = Set.of(
new MetricName("last-applied-record-offset", expectedGroup, "", Map.of()),
new MetricName("last-applied-record-timestamp", expectedGroup, "", Map.of()),
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java b/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
new file mode 100644
index 00000000000..6aef5d5af09
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class NodeMetricsTest {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMetricsExported(boolean enableUnstableVersions) {
+ Metrics metrics = new Metrics();
+ String expectedGroup = "node-metrics";
+
+ // Metric description is not used for metric name equality
+ Set stableFeatureMetrics = Set.of(
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "metadata-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "metadata-version")),
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "kraft-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "kraft-version")),
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "transaction-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "transaction-version")),
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "group-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "group-version")),
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "eligible-leader-replicas-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "eligible-leader-replicas-version")),
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "share-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "share-version"))
+ );
+
+ Set unstableFeatureMetrics = Set.of(
+ new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "streams-version")),
+ new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "streams-version"))
+ );
+
+ Set expectedMetrics = enableUnstableVersions
+ ? Stream.concat(stableFeatureMetrics.stream(), unstableFeatureMetrics.stream()).collect(Collectors.toSet())
+ : stableFeatureMetrics;
+
+ try (NodeMetrics ignored = new NodeMetrics(metrics, enableUnstableVersions)) {
+ Map metricsMap = metrics.metrics().entrySet().stream()
+ .filter(entry -> Objects.equals(entry.getKey().group(), expectedGroup))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ assertEquals(expectedMetrics.size(), metricsMap.size());
+ metricsMap.forEach((name, metric) -> assertTrue(expectedMetrics.contains(name)));
+ }
+
+ Map metricsMap = metrics.metrics().entrySet().stream()
+ .filter(entry -> Objects.equals(entry.getKey().group(), expectedGroup))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ assertEquals(0, metricsMap.size());
+ }
+}