KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse (#19127)

The kafka controllers need to set kraft.version in their
ApiVersionsResponse messages according to the current kraft.version
reported by the Raft layer. Instead, currently they always set it to 0.

Also remove FeatureControlManager.latestFinalizedFeatures. It is not
needed and it does a lot of copying.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2025-03-07 13:46:46 -08:00 committed by GitHub
parent 6940bef6e8
commit 343bc995f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 47 additions and 12 deletions

View File

@ -42,7 +42,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
@ -153,7 +153,9 @@ class ControllerServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
() => featuresPublisher.features()
() => featuresPublisher.features().setFinalizedLevel(
KRaftVersion.FEATURE_NAME,
raftManager.client.kraftVersion().featureLevel())
)
// metrics will be set to null when closing a controller, so we should recreate it for testing

View File

@ -1004,6 +1004,35 @@ class KRaftClusterTest {
}
}
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
val admin = Admin.create(cluster.newClientPropertiesBuilder().
setUsingBootstrapControllers(usingBootstrapControlers).
build())
try {
val featureMetadata = admin.describeFeatures().featureMetadata().get()
assertEquals(new SupportedVersionRange(0, 1),
featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
@Test
def testRemoteLogManagerInstantiation(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(

View File

@ -359,15 +359,6 @@ public class FeatureControlManager {
return new FinalizedControllerFeatures(features, epoch);
}
FinalizedControllerFeatures latestFinalizedFeatures() {
Map<String, Short> features = new HashMap<>();
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, -1);
}
public void replay(FeatureLevelRecord record) {
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
@ -395,7 +386,7 @@ public class FeatureControlManager {
}
boolean isElrFeatureEnabled() {
return latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >=
return finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >=
EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
}

View File

@ -40,4 +40,17 @@ public record FinalizedFeatures(
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
}
public FinalizedFeatures setFinalizedLevel(String key, short level) {
if (level == (short) 0) {
return this;
} else {
Map<String, Short> newFinalizedFeatures = new HashMap<>(finalizedFeatures);
newFinalizedFeatures.put(key, level);
return new FinalizedFeatures(
metadataVersion,
newFinalizedFeatures,
finalizedFeaturesEpoch);
}
}
}