KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse (#19127)

The kafka controllers need to set kraft.version in their
ApiVersionsResponse messages according to the current kraft.version
reported by the Raft layer. Instead, currently they always set it to 0.

Also remove FeatureControlManager.latestFinalizedFeatures. It is not
needed and it does a lot of copying.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2025-03-07 13:46:46 -08:00 committed by José Armando García Sancio
parent 988da9fc81
commit 21dd41b251
3 changed files with 50 additions and 2 deletions

View File

@ -44,9 +44,10 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.{CredentialProvider, PasswordEncoder}
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@ -176,7 +177,9 @@ class ControllerServer(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
config.migrationEnabled,
() => featuresPublisher.features()
() => featuresPublisher.features().setFinalizedLevel(
KRaftVersion.FEATURE_NAME,
raftManager.client.kraftVersion().featureLevel())
)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)

View File

@ -1047,6 +1047,34 @@ class KRaftClusterTest {
}
}
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).build()
try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
val admin = Admin.create(cluster.newClientPropertiesBuilder().
setUsingBootstrapControllers(usingBootstrapControlers).
build())
try {
val featureMetadata = admin.describeFeatures().featureMetadata().get()
assertEquals(new SupportedVersionRange(0, 1),
featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
@Test
def testRemoteLogManagerInstantiation(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(

View File

@ -25,6 +25,7 @@ public final class FinalizedFeatures {
private final MetadataVersion metadataVersion;
private final Map<String, Short> finalizedFeatures;
private final long finalizedFeaturesEpoch;
private final boolean kraftMode;
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Collections.emptyMap(), -1, true);
@ -39,6 +40,7 @@ public final class FinalizedFeatures {
this.metadataVersion = metadataVersion;
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.kraftMode = kraftMode;
// In KRaft mode, we always include the metadata version in the features map.
// In ZK mode, we never include it.
if (kraftMode) {
@ -82,4 +84,19 @@ public final class FinalizedFeatures {
", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
")";
}
public FinalizedFeatures setFinalizedLevel(String key, short level) {
if (level == (short) 0) {
return this;
} else {
Map<String, Short> newFinalizedFeatures = new HashMap<>(finalizedFeatures);
newFinalizedFeatures.put(key, level);
return new FinalizedFeatures(
metadataVersion,
newFinalizedFeatures,
finalizedFeaturesEpoch,
kraftMode
);
}
}
}