From 21dd41b251a1cc2faa0d3848ba0b973c1eba7eff Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Fri, 7 Mar 2025 13:46:46 -0800 Subject: [PATCH] KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse (#19127) The kafka controllers need to set kraft.version in their ApiVersionsResponse messages according to the current kraft.version reported by the Raft layer. Instead, currently they always set it to 0. Also remove FeatureControlManager.latestFinalizedFeatures. It is not needed and it does a lot of copying. Reviewers: Jun Rao , Chia-Ping Tsai --- .../scala/kafka/server/ControllerServer.scala | 7 +++-- .../kafka/server/KRaftClusterTest.scala | 28 +++++++++++++++++++ .../server/common/FinalizedFeatures.java | 17 +++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 0905215f9a2..11a0e2bbf86 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -44,9 +44,10 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.{CredentialProvider, PasswordEncoder} import org.apache.kafka.server.NodeToControllerChannelManager import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} @@ -176,7 +177,9 @@ class ControllerServer( ListenerType.CONTROLLER, config.unstableApiVersionsEnabled, config.migrationEnabled, - () => featuresPublisher.features() + () => featuresPublisher.features().setFinalizedLevel( + KRaftVersion.FEATURE_NAME, + raftManager.client.kraftVersion().featureLevel()) ) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 18402e54083..63d1b186e0b 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1047,6 +1047,34 @@ class KRaftClusterTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()).build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.newClientPropertiesBuilder(). + setUsingBootstrapControllers(usingBootstrapControlers). + build()) + try { + val featureMetadata = admin.describeFeatures().featureMetadata().get() + assertEquals(new SupportedVersionRange(0, 1), + featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME)) + assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort), + featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME)) + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + @Test def testRemoteLogManagerInstantiation(): Unit = { val cluster = new KafkaClusterTestKit.Builder( diff --git a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java index de78a3a72a8..74fce1f81ff 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java @@ -25,6 +25,7 @@ public final class FinalizedFeatures { private final MetadataVersion metadataVersion; private final Map finalizedFeatures; private final long finalizedFeaturesEpoch; + private final boolean kraftMode; public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); @@ -39,6 +40,7 @@ public final class FinalizedFeatures { this.metadataVersion = metadataVersion; this.finalizedFeatures = new HashMap<>(finalizedFeatures); this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; + this.kraftMode = kraftMode; // In KRaft mode, we always include the metadata version in the features map. // In ZK mode, we never include it. if (kraftMode) { @@ -82,4 +84,19 @@ public final class FinalizedFeatures { ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch + ")"; } + + public FinalizedFeatures setFinalizedLevel(String key, short level) { + if (level == (short) 0) { + return this; + } else { + Map newFinalizedFeatures = new HashMap<>(finalizedFeatures); + newFinalizedFeatures.put(key, level); + return new FinalizedFeatures( + metadataVersion, + newFinalizedFeatures, + finalizedFeaturesEpoch, + kraftMode + ); + } + } }