diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 0905215f9a2..11a0e2bbf86 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -44,9 +44,10 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.{CredentialProvider, PasswordEncoder} import org.apache.kafka.server.NodeToControllerChannelManager import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} @@ -176,7 +177,9 @@ class ControllerServer( ListenerType.CONTROLLER, config.unstableApiVersionsEnabled, config.migrationEnabled, - () => featuresPublisher.features() + () => featuresPublisher.features().setFinalizedLevel( + KRaftVersion.FEATURE_NAME, + raftManager.client.kraftVersion().featureLevel()) ) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 18402e54083..63d1b186e0b 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1047,6 +1047,34 @@ class KRaftClusterTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()).build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.newClientPropertiesBuilder(). + setUsingBootstrapControllers(usingBootstrapControlers). + build()) + try { + val featureMetadata = admin.describeFeatures().featureMetadata().get() + assertEquals(new SupportedVersionRange(0, 1), + featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME)) + assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort), + featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME)) + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + @Test def testRemoteLogManagerInstantiation(): Unit = { val cluster = new KafkaClusterTestKit.Builder( diff --git a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java index de78a3a72a8..74fce1f81ff 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java @@ -25,6 +25,7 @@ public final class FinalizedFeatures { private final MetadataVersion metadataVersion; private final Map finalizedFeatures; private final long finalizedFeaturesEpoch; + private final boolean kraftMode; public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); @@ -39,6 +40,7 @@ public final class FinalizedFeatures { this.metadataVersion = metadataVersion; this.finalizedFeatures = new HashMap<>(finalizedFeatures); this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; + this.kraftMode = kraftMode; // In KRaft mode, we always include the metadata version in the features map. // In ZK mode, we never include it. if (kraftMode) { @@ -82,4 +84,19 @@ public final class FinalizedFeatures { ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch + ")"; } + + public FinalizedFeatures setFinalizedLevel(String key, short level) { + if (level == (short) 0) { + return this; + } else { + Map newFinalizedFeatures = new HashMap<>(finalizedFeatures); + newFinalizedFeatures.put(key, level); + return new FinalizedFeatures( + metadataVersion, + newFinalizedFeatures, + finalizedFeaturesEpoch, + kraftMode + ); + } + } }