KAFKA-18062: use feature version to enable ELR (#17867)

Replace the ELR static config with feature version.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Calvin Liu 2024-11-26 14:40:23 -08:00 committed by GitHub
parent f5d712396b
commit 2b2b3cd355
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 201 additions and 70 deletions

View File

@ -242,8 +242,7 @@ class ControllerServer(
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
setInterBrokerListenerName(config.interBrokerListenerName.value()).
setEligibleLeaderReplicasEnabled(config.elrEnabled)
setInterBrokerListenerName(config.interBrokerListenerName.value())
}
controller = controllerBuilder.build()

View File

@ -340,8 +340,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val migrationEnabled: Boolean = getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
val migrationMetadataMinBatchSize: Int = getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)
val elrEnabled: Boolean = getBoolean(KRaftConfigs.ELR_ENABLED_CONFIG)
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole

View File

@ -43,7 +43,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
@ -376,6 +376,12 @@ abstract class QuorumTestHarness extends Logging {
} else TransactionVersion.TV_1.featureLevel()
formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, transactionVersion)
val elrVersion =
if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
} else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, elrVersion)
addFormatterSettings(formatter)
formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()

View File

@ -67,4 +67,12 @@ object TestInfoUtils {
def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
!testInfo.getDisplayName.contains("isTV2Enabled=false")
}
/**
* Returns whether eligible leader replicas version 1 is enabled.
* When no parameter is provided, the default returned is false.
*/
def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("isELRV1Enabled=true")
}
}

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@ -65,11 +65,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
assertEquals(4, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@ -83,6 +83,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
assertEquals(GroupVersion.GV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(

View File

@ -336,7 +336,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
"group.version, kraft.version, transaction.version",
"eligible.leader.replicas.version, group.version, kraft.version, transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)

View File

@ -396,6 +396,15 @@ public class FeatureControlManager {
return new FinalizedControllerFeatures(features, epoch);
}
FinalizedControllerFeatures latestFinalizedFeatures() {
Map<String, Short> features = new HashMap<>();
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, -1);
}
public void replay(FeatureLevelRecord record) {
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {

View File

@ -214,7 +214,6 @@ public final class QuorumController implements Controller {
private Map<String, Object> staticConfig = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
private boolean eligibleLeaderReplicasEnabled = false;
private DelegationTokenCache tokenCache;
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
@ -337,11 +336,6 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}
public Builder setDelegationTokenCache(DelegationTokenCache tokenCache) {
this.tokenCache = tokenCache;
return this;
@ -432,7 +426,6 @@ public final class QuorumController implements Controller {
delegationTokenMaxLifeMs,
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs,
eligibleLeaderReplicasEnabled,
uncleanLeaderElectionCheckIntervalMs,
interBrokerListenerName
);
@ -1436,11 +1429,6 @@ public final class QuorumController implements Controller {
*/
private final BootstrapMetadata bootstrapMetadata;
/**
* True if the KIP-966 eligible leader replicas feature is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled;
/**
* The maximum number of records per batch to allow.
*/
@ -1480,7 +1468,6 @@ public final class QuorumController implements Controller {
long delegationTokenMaxLifeMs,
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
boolean eligibleLeaderReplicasEnabled,
long uncleanLeaderElectionCheckIntervalMs,
String interBrokerListenerName
) {
@ -1549,7 +1536,6 @@ public final class QuorumController implements Controller {
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
@ -1580,7 +1566,6 @@ public final class QuorumController implements Controller {
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
@ -1599,10 +1584,7 @@ public final class QuorumController implements Controller {
setMetrics(controllerMetrics).
setTime(time).
build();
log.info("Creating new QuorumController with clusterId {}.{}",
clusterId,
eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : "");
log.info("Creating new QuorumController with clusterId {}", clusterId);
this.raftClient.register(metaLogListener);
}

View File

@ -95,6 +95,7 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy;
@ -165,7 +166,6 @@ public class ReplicationControlManager {
private ClusterControlManager clusterControl = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private FeatureControlManager featureControl = null;
private boolean eligibleLeaderReplicasEnabled = false;
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
@ -187,11 +187,6 @@ public class ReplicationControlManager {
return this;
}
Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}
Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
@ -233,7 +228,6 @@ public class ReplicationControlManager {
defaultReplicationFactor,
defaultNumPartitions,
maxElectionsPerImbalance,
eligibleLeaderReplicasEnabled,
configurationControl,
clusterControl,
createTopicPolicy,
@ -305,11 +299,6 @@ public class ReplicationControlManager {
*/
private final int defaultNumPartitions;
/**
* True if eligible leader replicas is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled;
/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
@ -399,7 +388,6 @@ public class ReplicationControlManager {
short defaultReplicationFactor,
int defaultNumPartitions,
int maxElectionsPerImbalance,
boolean eligibleLeaderReplicasEnabled,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Optional<CreateTopicPolicy> createTopicPolicy,
@ -410,7 +398,6 @@ public class ReplicationControlManager {
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
@ -1029,7 +1016,8 @@ public class ReplicationControlManager {
}
boolean isElrEnabled() {
return eligibleLeaderReplicasEnabled && featureControl.metadataVersion().isElrSupported();
return featureControl.metadataVersion().isElrSupported() && featureControl.latestFinalizedFeatures().
versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
ControllerResult<AlterPartitionResponseData> alterPartition(

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
@ -75,6 +76,32 @@ public class QuorumControllerIntegrationTestUtils {
return features;
}
/**
* Create a broker features collection for use in a registration request. MV and given features are included.
*
* @param minVersion The minimum supported MV.
* @param maxVersion The maximum supported MV.
* @param featureMaxVersions The features and their max supported versions.
*/
static BrokerRegistrationRequestData.FeatureCollection brokerFeaturesPlusFeatureVersions(
MetadataVersion minVersion,
MetadataVersion maxVersion,
Map<String, Short> featureMaxVersions
) {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(minVersion.featureLevel())
.setMaxSupportedVersion(maxVersion.featureLevel()));
featureMaxVersions.entrySet().forEach(entry -> {
features.add(new BrokerRegistrationRequestData.Feature()
.setName(entry.getKey())
.setMaxSupportedVersion(entry.getValue())
.setMinSupportedVersion((short) 0));
});
return features;
}
/**
* Register the given number of brokers.
*
@ -94,7 +121,8 @@ public class QuorumControllerIntegrationTestUtils {
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting()))
.setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(),
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel())))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
.setLogDirs(Collections.singletonList(
Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA")

View File

@ -106,6 +106,7 @@ import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -157,6 +158,7 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers;
@ -188,7 +190,8 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(),
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
setClusterId(logEnv.clusterId())).get();
@ -229,7 +232,8 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(),
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
setClusterId(logEnv.clusterId())).get();
@ -367,14 +371,16 @@ public class QuorumControllerTest {
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = new HashMap<>();
BrokerRegistrationRequestData.FeatureCollection features =
brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1,
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()));
for (Integer brokerId : allBrokers) {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1)).
setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners));
@ -442,7 +448,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(brokerToUncleanShutdown).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1)).
setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
@ -455,7 +461,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(lastKnownElr[0]).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1)).
setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
@ -737,7 +743,8 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(),
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());

View File

@ -17,10 +17,12 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
@ -115,11 +117,17 @@ public class QuorumControllerTestEnv implements AutoCloseable {
fatalFaultHandlers.put(nodeId, fatalFaultHandler);
MockFaultHandler nonFatalFaultHandler = new MockFaultHandler("nonFatalFaultHandler");
builder.setNonFatalFaultHandler(nonFatalFaultHandler);
builder.setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled);
builder.setConfigSchema(FakeKafkaConfigSchema.INSTANCE);
nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
controllerBuilderInitializer.accept(builder);
this.controllers.add(builder.build());
QuorumController controller = builder.build();
if (eligibleLeaderReplicasEnabled) {
controller.featureControl().replay(new FeatureLevelRecord()
.setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
.setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel())
);
}
this.controllers.add(controller);
}
} catch (Exception e) {
close();

View File

@ -63,6 +63,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.On
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@ -92,6 +93,7 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.policy.CreateTopicPolicy;
@ -218,6 +220,7 @@ public class ReplicationControlManagerTest {
final ClusterControlManager clusterControl;
final ConfigurationControlManager configurationControl;
final ReplicationControlManager replicationControl;
final OffsetControlManager offsetControlManager;
void replay(List<ApiMessageAndVersion> records) {
RecordTestUtils.replayAll(clusterControl, records);
@ -245,6 +248,12 @@ public class ReplicationControlManagerTest {
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
featureControl.replay(new FeatureLevelRecord()
.setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
.setFeatureLevel(isElrEnabled ?
EligibleLeaderReplicasVersion.ELRV_1.featureLevel() :
EligibleLeaderReplicasVersion.ELRV_0.featureLevel())
);
this.clusterControl = new ClusterControlManager.Builder().
setLogContext(logContext).
setTime(time).
@ -254,7 +263,9 @@ public class ReplicationControlManagerTest {
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
build();
this.offsetControlManager = new OffsetControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
@ -263,7 +274,6 @@ public class ReplicationControlManagerTest {
setClusterControl(clusterControl).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
setEligibleLeaderReplicasEnabled(isElrEnabled).
build();
clusterControl.activate();
}

View File

@ -360,7 +360,8 @@ public class FormatterTest {
formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values()));
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported features " +
"are: group.version, kraft.version, test.feature.version, transaction.version",
"are: eligible.leader.replicas.version, group.version, kraft.version, " +
"test.feature.version, transaction.version",
assertThrows(FormatterException.class,
() -> formatter1.formatter.run()).
getMessage());

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.Map;
public enum EligibleLeaderReplicasVersion implements FeatureVersion {
// Version 0 is the version disable ELR.
ELRV_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
// Version 1 enables the ELR (KIP-966).
ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap());
public static final String FEATURE_NAME = "eligible.leader.replicas.version";
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
private final Map<String, Short> dependencies;
EligibleLeaderReplicasVersion(
int featureLevel,
MetadataVersion bootstrapMetadataVersion,
Map<String, Short> dependencies
) {
this.featureLevel = (short) featureLevel;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.dependencies = dependencies;
}
@Override
public short featureLevel() {
return featureLevel;
}
@Override
public String featureName() {
return FEATURE_NAME;
}
@Override
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}
@Override
public Map<String, Short> dependencies() {
return dependencies;
}
public boolean isEligibleLeaderReplicasFeatureEnabeld() {
return featureLevel >= ELRV_1.featureLevel;
}
public static EligibleLeaderReplicasVersion fromFeatureLevel(short version) {
switch (version) {
case 0:
return ELRV_0;
case 1:
return ELRV_1;
default:
throw new RuntimeException("Unknown eligible leader replicas feature level: " + (int) version);
}
}
}

View File

@ -44,7 +44,8 @@ public enum Features {
TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
KRAFT_VERSION("kraft.version", KRaftVersion.values()),
TRANSACTION_VERSION("transaction.version", TransactionVersion.values()),
GROUP_VERSION("group.version", GroupVersion.values());
GROUP_VERSION("group.version", GroupVersion.values()),
ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.values());
public static final Features[] FEATURES;
public static final List<Features> PRODUCTION_FEATURES;

View File

@ -123,9 +123,6 @@ public class KRaftConfigs {
public static final int MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT = 200;
public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_DOC = "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft";
/** Enable eligible leader replicas configs */
public static final String ELR_ENABLED_CONFIG = "eligible.leader.replicas.enable";
public static final String ELR_ENABLED_DOC = "Enable the Eligible leader replicas";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
@ -145,7 +142,6 @@ public class KRaftConfigs {
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC)
.define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, MIGRATION_ENABLED_DOC)
.define(ELR_ENABLED_CONFIG, BOOLEAN, false, HIGH, ELR_ENABLED_DOC)
.defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC);
}

View File

@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.server.common.Features.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Features.GROUP_VERSION;
import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -111,6 +112,7 @@ public class BrokerFeaturesTest {
expectedFeatures.put(MetadataVersion.FEATURE_NAME, MetadataVersion.latestTesting().featureLevel());
expectedFeatures.put(TRANSACTION_VERSION.featureName(), TRANSACTION_VERSION.latestTesting());
expectedFeatures.put(GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting());
expectedFeatures.put(ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting());
expectedFeatures.put("kraft.version", (short) 0);
expectedFeatures.put("test_feature_1", (short) 4);
expectedFeatures.put("test_feature_2", (short) 3);

View File

@ -60,14 +60,16 @@ public class FeatureCommandTest {
List<String> features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
assertEquals("Feature: eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(1)));
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(2)));
"SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(3)));
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
}
// Use the first MetadataVersion that supports KIP-919
@ -80,14 +82,16 @@ public class FeatureCommandTest {
List<String> features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
assertEquals("Feature: eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(1)));
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(2)));
"SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(3)));
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
@ -172,7 +176,8 @@ public class FeatureCommandTest {
"downgrade", "--release-version", "3.7-IV3"))
);
assertEquals("group.version was downgraded to 0.\n" +
assertEquals("eligible.leader.replicas.version was downgraded to 0.\n" +
"group.version was downgraded to 0.\n" +
"kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" +
"transaction.version was downgraded to 0.", commandOutput);