KAFKA-19254 Add generic feature level metrics (#20021)

This PR adds the following metrics for each of the supported production
features (`metadata.version`, `kraft.version`, `transaction.version`,
etc.):

`kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=X`

`kafka.server:type=node-metrics,name=maximum-supported-level,feature-name=X`

`kafka.server:type=node-metrics,name=minimum-supported-level,feature-name=X`

Reviewers: Josep Prat <josep.prat@aiven.io>, PoAn Yang
 <payang@apache.org>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Lan Ding
 <isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kevin Wu 2025-07-14 15:27:04 -05:00 committed by GitHub
parent 6437135bc0
commit a64f5bf6ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 495 additions and 9 deletions

View File

@ -94,6 +94,7 @@
<allow pkg="org.apache.logging.log4j.core.config" /> <allow pkg="org.apache.logging.log4j.core.config" />
<subpackage name="metrics"> <subpackage name="metrics">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" /> <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow class="org.apache.kafka.controller.QuorumFeatures" />
<allow pkg="org.apache.kafka.server.telemetry" /> <allow pkg="org.apache.kafka.server.telemetry" />
</subpackage> </subpackage>
<subpackage name="config"> <subpackage name="config">

View File

@ -37,7 +37,7 @@ import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler} import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics} import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics}
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.Arrays import java.util.Arrays
@ -116,6 +116,7 @@ class SharedServer(
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _ @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _ @volatile var brokerMetrics: BrokerServerMetrics = _
@volatile var controllerServerMetrics: ControllerMetadataMetrics = _ @volatile var controllerServerMetrics: ControllerMetadataMetrics = _
@volatile var nodeMetrics: NodeMetrics = _
@volatile var loader: MetadataLoader = _ @volatile var loader: MetadataLoader = _
private val snapshotsDisabledReason = new AtomicReference[String](null) private val snapshotsDisabledReason = new AtomicReference[String](null)
@volatile var snapshotEmitter: SnapshotEmitter = _ @volatile var snapshotEmitter: SnapshotEmitter = _
@ -298,6 +299,7 @@ class SharedServer(
raftManager = _raftManager raftManager = _raftManager
_raftManager.startup() _raftManager.startup()
nodeMetrics = new NodeMetrics(metrics, controllerConfig.unstableFeatureVersionsEnabled)
metadataLoaderMetrics = if (brokerMetrics != null) { metadataLoaderMetrics = if (brokerMetrics != null) {
new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()), new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs), elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
@ -387,6 +389,8 @@ class SharedServer(
controllerServerMetrics = null controllerServerMetrics = null
Utils.closeQuietly(brokerMetrics, "broker metrics") Utils.closeQuietly(brokerMetrics, "broker metrics")
brokerMetrics = null brokerMetrics = null
Utils.closeQuietly(nodeMetrics, "node metrics")
nodeMetrics = null
Utils.closeQuietly(metrics, "metrics") Utils.closeQuietly(metrics, "metrics")
metrics = null metrics = null
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this)

View File

@ -17,6 +17,8 @@
package org.apache.kafka.image.loader; package org.apache.kafka.image.loader;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
@ -30,9 +32,12 @@ import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader; import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient; import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotReader;
@ -345,7 +350,24 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
} }
} }
metrics.updateLastAppliedImageProvenance(image.provenance()); metrics.updateLastAppliedImageProvenance(image.provenance());
metrics.setCurrentMetadataVersion(image.features().metadataVersionOrThrow()); MetadataVersion metadataVersion = image.features().metadataVersionOrThrow();
metrics.setCurrentMetadataVersion(metadataVersion);
// Set the metadata version feature level, since it is handled separately from other features
metrics.recordFinalizedFeatureLevel(
MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel()
);
// Set all production feature levels from the image
metrics.maybeRemoveFinalizedFeatureLevelMetrics(image.features().finalizedVersions());
for (var featureEntry : image.features().finalizedVersions().entrySet()) {
metrics.recordFinalizedFeatureLevel(
featureEntry.getKey(),
featureEntry.getValue()
);
}
if (!uninitializedPublishers.isEmpty()) { if (!uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0); scheduleInitializeNewPublishers(0);
} }
@ -357,6 +379,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
try (reader) { try (reader) {
while (reader.hasNext()) { while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next(); Batch<ApiMessageAndVersion> batch = reader.next();
loadControlRecords(batch);
long elapsedNs = batchLoader.loadBatch(batch, currentLeaderAndEpoch); long elapsedNs = batchLoader.loadBatch(batch, currentLeaderAndEpoch);
metrics.updateBatchSize(batch.records().size()); metrics.updateBatchSize(batch.records().size());
metrics.updateBatchProcessingTimeNs(elapsedNs); metrics.updateBatchProcessingTimeNs(elapsedNs);
@ -418,6 +441,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
int snapshotIndex = 0; int snapshotIndex = 0;
while (reader.hasNext()) { while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next(); Batch<ApiMessageAndVersion> batch = reader.next();
loadControlRecords(batch);
for (ApiMessageAndVersion record : batch.records()) { for (ApiMessageAndVersion record : batch.records()) {
try { try {
delta.replay(record.message()); delta.replay(record.message());
@ -435,6 +459,15 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
time.nanoseconds() - startNs); time.nanoseconds() - startNs);
} }
void loadControlRecords(Batch<ApiMessageAndVersion> batch) {
for (ControlRecord controlRecord : batch.controlRecords()) {
if (controlRecord.type() == ControlRecordType.KRAFT_VERSION) {
final var kRaftVersionRecord = (KRaftVersionRecord) controlRecord.message();
metrics.recordFinalizedFeatureLevel(KRaftVersion.FEATURE_NAME, kRaftVersionRecord.kRaftVersion());
}
}
}
@Override @Override
public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
eventQueue.append(() -> { eventQueue.append(() -> {

View File

@ -18,6 +18,7 @@
package org.apache.kafka.image.loader.metrics; package org.apache.kafka.image.loader.metrics;
import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.metrics.KafkaYammerMetrics;
@ -25,8 +26,12 @@ import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.MetricsRegistry;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -40,12 +45,15 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
"MetadataLoader", "CurrentMetadataVersion"); "MetadataLoader", "CurrentMetadataVersion");
private static final MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName( private static final MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName(
"MetadataLoader", "HandleLoadSnapshotCount"); "MetadataLoader", "HandleLoadSnapshotCount");
public static final MetricName CURRENT_CONTROLLER_ID = getMetricName( private static final MetricName CURRENT_CONTROLLER_ID = getMetricName(
"MetadataLoader", "CurrentControllerId"); "MetadataLoader", "CurrentControllerId");
private static final String FINALIZED_LEVEL_METRIC_NAME = "FinalizedLevel";
private static final String FEATURE_NAME_TAG = "featureName";
private final Optional<MetricsRegistry> registry; private final Optional<MetricsRegistry> registry;
private final AtomicReference<MetadataVersion> currentMetadataVersion = private final AtomicReference<MetadataVersion> currentMetadataVersion =
new AtomicReference<>(MetadataVersion.MINIMUM_VERSION); new AtomicReference<>(MetadataVersion.MINIMUM_VERSION);
private final Map<String, Short> finalizedFeatureLevels = new ConcurrentHashMap<>();
private final AtomicInteger currentControllerId = new AtomicInteger(-1); private final AtomicInteger currentControllerId = new AtomicInteger(-1);
private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0); private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0);
private final Consumer<Long> batchProcessingTimeNsUpdater; private final Consumer<Long> batchProcessingTimeNsUpdater;
@ -90,6 +98,32 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
})); }));
} }
private void addFinalizedFeatureLevelMetric(String featureName) {
registry.ifPresent(r -> r.newGauge(
getFeatureNameTagMetricName(
"MetadataLoader",
FINALIZED_LEVEL_METRIC_NAME,
featureName
),
new Gauge<Short>() {
@Override
public Short value() {
return finalizedFeatureLevel(featureName);
}
}
));
}
private void removeFinalizedFeatureLevelMetric(String featureName) {
registry.ifPresent(r -> r.removeMetric(
getFeatureNameTagMetricName(
"MetadataLoader",
FINALIZED_LEVEL_METRIC_NAME,
featureName
)
));
}
/** /**
* Update the batch processing time histogram. * Update the batch processing time histogram.
*/ */
@ -142,6 +176,48 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
return this.handleLoadSnapshotCount.get(); return this.handleLoadSnapshotCount.get();
} }
/**
* Remove the FinalizedLevel metric for features who are no longer part of the
* current features image.
* Note that metadata.version and kraft.version are not included in
* the features image, so they are not removed.
* @param newFinalizedLevels The new finalized feature levels from the features image
*/
public void maybeRemoveFinalizedFeatureLevelMetrics(Map<String, Short> newFinalizedLevels) {
final var iter = finalizedFeatureLevels.keySet().iterator();
while (iter.hasNext()) {
final var featureName = iter.next();
if (newFinalizedLevels.containsKey(featureName) ||
featureName.equals(MetadataVersion.FEATURE_NAME) ||
featureName.equals(KRaftVersion.FEATURE_NAME)) {
continue;
}
removeFinalizedFeatureLevelMetric(featureName);
iter.remove();
}
}
/**
* Record the finalized feature level and ensure the metric is registered.
*
* @param featureName The name of the feature
* @param featureLevel The finalized level for the feature
*/
public void recordFinalizedFeatureLevel(String featureName, short featureLevel) {
final var metricNotRegistered = finalizedFeatureLevels.put(featureName, featureLevel) == null;
if (metricNotRegistered) addFinalizedFeatureLevelMetric(featureName);
}
/**
* Get the finalized feature level for a feature.
*
* @param featureName The name of the feature
* @return The finalized level for the feature
*/
public short finalizedFeatureLevel(String featureName) {
return finalizedFeatureLevels.get(featureName);
}
@Override @Override
public void close() { public void close() {
registry.ifPresent(r -> List.of( registry.ifPresent(r -> List.of(
@ -149,9 +225,37 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
CURRENT_CONTROLLER_ID, CURRENT_CONTROLLER_ID,
HANDLE_LOAD_SNAPSHOT_COUNT HANDLE_LOAD_SNAPSHOT_COUNT
).forEach(r::removeMetric)); ).forEach(r::removeMetric));
for (var featureName : finalizedFeatureLevels.keySet()) {
removeFinalizedFeatureLevelMetric(featureName);
}
} }
private static MetricName getMetricName(String type, String name) { private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.server", type, name); return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
} }
private static MetricName getFeatureNameTagMetricName(String type, String name, String featureName) {
LinkedHashMap<String, String> featureNameTag = new LinkedHashMap<>();
featureNameTag.put(FEATURE_NAME_TAG, sanitizeFeatureName(featureName));
return KafkaYammerMetrics.getMetricName("kafka.server", type, name, featureNameTag);
}
/**
* Sanitize the feature name to be used as a tag in metrics by
* converting from dot notation to camel case.
* The conversion is done to be consistent with other Yammer metrics.
*
* @param featureName The feature name in dot notation.
* @return The sanitized feature name in camel case.
*/
private static String sanitizeFeatureName(String featureName) {
final var words = featureName.split("\\.");
final var builder = new StringBuilder(words[0]);
for (int i = 1; i < words.length; i++) {
final var word = words[i];
builder.append(Character.toUpperCase(word.charAt(0)))
.append(word.substring(1).toLowerCase(Locale.ROOT));
}
return builder.toString();
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.kafka.image.loader; package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord; import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord; import org.apache.kafka.common.metadata.BeginTransactionRecord;
@ -36,6 +37,7 @@ import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord; import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.server.fault.MockFaultHandler;
@ -59,6 +61,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION; import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -355,6 +358,8 @@ public class MetadataLoaderTest {
publishers.get(0).latestSnapshotManifest); publishers.get(0).latestSnapshotManifest);
assertEquals(MINIMUM_VERSION, assertEquals(MINIMUM_VERSION,
loader.metrics().currentMetadataVersion()); loader.metrics().currentMetadataVersion());
assertEquals(MINIMUM_VERSION.featureLevel(),
loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
} }
assertTrue(publishers.get(0).closed); assertTrue(publishers.get(0).closed);
assertEquals(Optional.of(MINIMUM_VERSION), publishers.get(0).latestImage.features().metadataVersion()); assertEquals(Optional.of(MINIMUM_VERSION), publishers.get(0).latestImage.features().metadataVersion());
@ -508,6 +513,65 @@ public class MetadataLoaderTest {
faultHandler.maybeRethrowFirstException(); faultHandler.maybeRethrowFirstException();
} }
/**
* Test the kraft.version finalized level value is correct.
* @throws Exception
*/
@Test
public void testKRaftVersionFinalizedLevelMetric() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testKRaftVersionFinalizedLevelMetric");
MockTime time = new MockTime();
List<MockPublisher> publishers = List.of(new MockPublisher());
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setTime(time).
setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) {
loader.installPublishers(publishers).get();
loadTestSnapshot(loader, 200);
assertThrows(
NullPointerException.class,
() -> loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME)
);
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
MockBatchReader batchReader = new MockBatchReader(
300,
List.of(
Batch.control(
300,
100,
4000,
10,
List.of(ControlRecord.of(new KRaftVersionRecord()))
)
)
).setTime(time);
loader.handleCommit(batchReader);
loader.waitForAllEventsToBeHandled();
assertTrue(batchReader.closed);
assertEquals(300L, loader.lastAppliedOffset());
assertEquals((short) 0, loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME));
loader.handleCommit(new MockBatchReader(301, List.of(
MockBatchReader.newBatch(301, 100, List.of(
new ApiMessageAndVersion(new RemoveTopicRecord().
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))))));
loader.waitForAllEventsToBeHandled();
assertEquals(301L, loader.lastAppliedOffset());
assertEquals((short) 0, loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME));
loader.handleCommit(new MockBatchReader(302, List.of(
Batch.control(
302,
100,
4000,
10,
List.of(ControlRecord.of(new KRaftVersionRecord().setKRaftVersion((short) 1)))))));
loader.waitForAllEventsToBeHandled();
assertEquals(302L, loader.lastAppliedOffset());
assertEquals((short) 1, loader.metrics().finalizedFeatureLevel(KRaftVersion.FEATURE_NAME));
}
assertTrue(publishers.get(0).closed);
}
/** /**
* Test that the lastAppliedOffset moves forward as expected. * Test that the lastAppliedOffset moves forward as expected.
*/ */
@ -640,12 +704,16 @@ public class MetadataLoaderTest {
assertEquals(200L, loader.lastAppliedOffset()); assertEquals(200L, loader.lastAppliedOffset());
assertEquals(MINIMUM_VERSION.featureLevel(), assertEquals(MINIMUM_VERSION.featureLevel(),
loader.metrics().currentMetadataVersion().featureLevel()); loader.metrics().currentMetadataVersion().featureLevel());
assertEquals(MINIMUM_VERSION.featureLevel(),
loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
assertFalse(publishers.get(0).latestDelta.image().isEmpty()); assertFalse(publishers.get(0).latestDelta.image().isEmpty());
loadTestSnapshot2(loader, 400); loadTestSnapshot2(loader, 400);
assertEquals(400L, loader.lastAppliedOffset()); assertEquals(400L, loader.lastAppliedOffset());
assertEquals(MetadataVersion.latestProduction().featureLevel(), assertEquals(MetadataVersion.latestProduction().featureLevel(),
loader.metrics().currentMetadataVersion().featureLevel()); loader.metrics().currentMetadataVersion().featureLevel());
assertEquals(MetadataVersion.latestProduction().featureLevel(),
loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
// Make sure the topic in the initial snapshot was overwritten by loading the new snapshot. // Make sure the topic in the initial snapshot was overwritten by loading the new snapshot.
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo")); assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
@ -659,6 +727,8 @@ public class MetadataLoaderTest {
loader.waitForAllEventsToBeHandled(); loader.waitForAllEventsToBeHandled();
assertEquals(IBP_3_5_IV0.featureLevel(), assertEquals(IBP_3_5_IV0.featureLevel(),
loader.metrics().currentMetadataVersion().featureLevel()); loader.metrics().currentMetadataVersion().featureLevel());
assertEquals(IBP_3_5_IV0.featureLevel(),
loader.metrics().finalizedFeatureLevel(FEATURE_NAME));
} }
faultHandler.maybeRethrowFirstException(); faultHandler.maybeRethrowFirstException();
} }

View File

@ -19,6 +19,9 @@ package org.apache.kafka.image.loader.metrics;
import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils; import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TransactionVersion;
import com.yammer.metrics.core.Gauge; import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricName;
@ -26,16 +29,15 @@ import com.yammer.metrics.core.MetricsRegistry;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
public class MetadataLoaderMetricsTest { public class MetadataLoaderMetricsTest {
private static class FakeMetadataLoaderMetrics implements AutoCloseable { private static class FakeMetadataLoaderMetrics implements AutoCloseable {
final AtomicLong batchProcessingTimeNs = new AtomicLong(0L); final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
@ -72,7 +74,22 @@ public class MetadataLoaderMetricsTest {
"kafka.server:type=MetadataLoader,name=CurrentControllerId", "kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion", "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount" "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
)); )
);
// Record some feature levels and verify their metrics are registered
fakeMetrics.metrics.recordFinalizedFeatureLevel("metadata.version", (short) 3);
fakeMetrics.metrics.recordFinalizedFeatureLevel("kraft.version", (short) 4);
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion"
)
);
} }
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server", ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of()); Set.of());
@ -114,7 +131,7 @@ public class MetadataLoaderMetricsTest {
MetricsRegistry registry = new MetricsRegistry(); MetricsRegistry registry = new MetricsRegistry();
try { try {
try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
fakeMetrics.metrics.setCurrentMetadataVersion(MINIMUM_VERSION); fakeMetrics.metrics.setCurrentMetadataVersion(MetadataVersion.IBP_3_7_IV0);
fakeMetrics.metrics.incrementHandleLoadSnapshotCount(); fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
fakeMetrics.metrics.incrementHandleLoadSnapshotCount(); fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
@ -122,7 +139,7 @@ public class MetadataLoaderMetricsTest {
Gauge<Integer> currentMetadataVersion = (Gauge<Integer>) registry Gauge<Integer> currentMetadataVersion = (Gauge<Integer>) registry
.allMetrics() .allMetrics()
.get(metricName("MetadataLoader", "CurrentMetadataVersion")); .get(metricName("MetadataLoader", "CurrentMetadataVersion"));
assertEquals(MINIMUM_VERSION.featureLevel(), assertEquals(MetadataVersion.IBP_3_7_IV0.featureLevel(),
currentMetadataVersion.value().shortValue()); currentMetadataVersion.value().shortValue());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -153,8 +170,100 @@ public class MetadataLoaderMetricsTest {
} }
} }
@Test
public void testFinalizedFeatureLevelMetrics() {
MetricsRegistry registry = new MetricsRegistry();
try {
try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
// Initially no finalized level metrics should be registered
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
)
);
// Record metadata version and verify its metric
fakeMetrics.metrics.recordFinalizedFeatureLevel(MetadataVersion.FEATURE_NAME, (short) 5);
@SuppressWarnings("unchecked")
Gauge<Short> finalizedMetadataVersion = (Gauge<Short>) registry
.allMetrics()
.get(metricName("MetadataLoader", "FinalizedLevel", "featureName=metadataVersion"));
assertEquals((short) 5, finalizedMetadataVersion.value());
// Record KRaft version and verify its metric
fakeMetrics.metrics.recordFinalizedFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
@SuppressWarnings("unchecked")
Gauge<Short> finalizedKRaftVersion = (Gauge<Short>) registry
.allMetrics()
.get(metricName("MetadataLoader", "FinalizedLevel", "featureName=kraftVersion"));
assertEquals((short) 1, finalizedKRaftVersion.value());
// Record transaction version and verify its metric
fakeMetrics.metrics.recordFinalizedFeatureLevel(TransactionVersion.FEATURE_NAME, (short) 1);
@SuppressWarnings("unchecked")
Gauge<Short> finalizedTransactionVersion = (Gauge<Short>) registry
.allMetrics()
.get(metricName("MetadataLoader", "FinalizedLevel", "featureName=transactionVersion"));
assertEquals((short) 1, finalizedTransactionVersion.value());
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
)
);
// When a feature's finalized level is not present in the new image, its metric should be removed
// This does not apply to metadataVersion and kraftVersion
fakeMetrics.metrics.maybeRemoveFinalizedFeatureLevelMetrics(Map.of());
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion"
)
);
// Set the finalized feature level and check the metric is added back with its correct value
fakeMetrics.metrics.recordFinalizedFeatureLevel(TransactionVersion.FEATURE_NAME, (short) 2);
@SuppressWarnings("unchecked")
Gauge<Short> finalizedTransactionVersion2 = (Gauge<Short>) registry
.allMetrics()
.get(metricName("MetadataLoader", "FinalizedLevel", "featureName=transactionVersion"));
assertEquals((short) 2, finalizedTransactionVersion2.value());
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
)
);
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
Set.of());
} finally {
registry.shutdown();
}
}
private static MetricName metricName(String type, String name) { private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name); String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name);
return new MetricName("kafka.server", type, name, null, mBeanName); return new MetricName("kafka.server", type, name, null, mBeanName);
} }
private static MetricName metricName(String type, String name, String scope) {
String mBeanName = String.format("kafka.server:type=%s,name=%s,%s", type, name, scope);
return new MetricName("kafka.server", type, name, scope, mBeanName);
}
} }

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.metadata.VersionRange;
import java.util.Map;
public final class NodeMetrics implements AutoCloseable {
private static final String METRIC_GROUP_NAME = "node-metrics";
private static final String FEATURE_NAME_TAG = "feature-name";
private static final String MAXIMUM_SUPPORTED_LEVEL_NAME = "maximum-supported-level";
private static final String MINIMUM_SUPPORTED_LEVEL_NAME = "minimum-supported-level";
private final Metrics metrics;
private final Map<String, VersionRange> supportedFeatureRanges;
public NodeMetrics(Metrics metrics, boolean enableUnstableVersions) {
this.metrics = metrics;
this.supportedFeatureRanges = QuorumFeatures.defaultSupportedFeatureMap(enableUnstableVersions);
supportedFeatureRanges.forEach((featureName, versionRange) -> {
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.max());
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.min());
});
}
private void addSupportedLevelMetric(String metricName, String featureName, short value) {
metrics.addMetric(
getFeatureNameTagMetricName(
metricName,
METRIC_GROUP_NAME,
featureName
),
(Gauge<Short>) (config, now) -> value
);
}
@Override
public void close() {
for (var featureName : supportedFeatureRanges.keySet()) {
metrics.removeMetric(
getFeatureNameTagMetricName(
MAXIMUM_SUPPORTED_LEVEL_NAME,
METRIC_GROUP_NAME,
featureName
)
);
metrics.removeMetric(
getFeatureNameTagMetricName(
MINIMUM_SUPPORTED_LEVEL_NAME,
METRIC_GROUP_NAME,
featureName
)
);
}
}
private MetricName getFeatureNameTagMetricName(String name, String group, String featureName) {
return metrics.metricName(
name,
group,
Map.of(FEATURE_NAME_TAG, featureName.replace(".", "-"))
);
}
}

View File

@ -38,7 +38,7 @@ public final class BrokerServerMetricsTest {
Metrics metrics = new Metrics(); Metrics metrics = new Metrics();
String expectedGroup = "broker-metadata-metrics"; String expectedGroup = "broker-metadata-metrics";
// Metric description is not use for metric name equality // Metric description is not used for metric name equality
Set<MetricName> expectedMetrics = Set.of( Set<MetricName> expectedMetrics = Set.of(
new MetricName("last-applied-record-offset", expectedGroup, "", Map.of()), new MetricName("last-applied-record-offset", expectedGroup, "", Map.of()),
new MetricName("last-applied-record-timestamp", expectedGroup, "", Map.of()), new MetricName("last-applied-record-timestamp", expectedGroup, "", Map.of()),

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class NodeMetricsTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMetricsExported(boolean enableUnstableVersions) {
Metrics metrics = new Metrics();
String expectedGroup = "node-metrics";
// Metric description is not used for metric name equality
Set<MetricName> stableFeatureMetrics = Set.of(
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "metadata-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "metadata-version")),
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "kraft-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "kraft-version")),
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "transaction-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "transaction-version")),
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "group-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "group-version")),
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "eligible-leader-replicas-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "eligible-leader-replicas-version")),
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "share-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "share-version"))
);
Set<MetricName> unstableFeatureMetrics = Set.of(
new MetricName("maximum-supported-level", expectedGroup, "", Map.of("feature-name", "streams-version")),
new MetricName("minimum-supported-level", expectedGroup, "", Map.of("feature-name", "streams-version"))
);
Set<MetricName> expectedMetrics = enableUnstableVersions
? Stream.concat(stableFeatureMetrics.stream(), unstableFeatureMetrics.stream()).collect(Collectors.toSet())
: stableFeatureMetrics;
try (NodeMetrics ignored = new NodeMetrics(metrics, enableUnstableVersions)) {
Map<MetricName, KafkaMetric> metricsMap = metrics.metrics().entrySet().stream()
.filter(entry -> Objects.equals(entry.getKey().group(), expectedGroup))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
assertEquals(expectedMetrics.size(), metricsMap.size());
metricsMap.forEach((name, metric) -> assertTrue(expectedMetrics.contains(name)));
}
Map<MetricName, KafkaMetric> metricsMap = metrics.metrics().entrySet().stream()
.filter(entry -> Objects.equals(entry.getKey().group(), expectedGroup))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
assertEquals(0, metricsMap.size());
}
}