diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 90deff7ed86..8cc14516126 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -242,8 +242,7 @@ class ControllerServer( setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs). setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs). setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs). - setInterBrokerListenerName(config.interBrokerListenerName.value()). - setEligibleLeaderReplicasEnabled(config.elrEnabled) + setInterBrokerListenerName(config.interBrokerListenerName.value()) } controller = controllerBuilder.build() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fec938758c4..18327a59b55 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -340,8 +340,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val migrationEnabled: Boolean = getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG) val migrationMetadataMinBatchSize: Int = getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG) - val elrEnabled: Boolean = getBoolean(KRaftConfigs.ELR_ENABLED_CONFIG) - private def parseProcessRoles(): Set[ProcessRole] = { val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { case "broker" => ProcessRole.BrokerRole diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index cc0c2bb3662..45712240ecd 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -43,7 +43,7 @@ import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.queue.KafkaEventQueue import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory} -import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion} +import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion} import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.kafka.server.util.timer.SystemTimer @@ -376,6 +376,12 @@ abstract class QuorumTestHarness extends Logging { } else TransactionVersion.TV_1.featureLevel() formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, transactionVersion) + val elrVersion = + if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) { + EligibleLeaderReplicasVersion.ELRV_1.featureLevel() + } else EligibleLeaderReplicasVersion.ELRV_0.featureLevel() + formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, elrVersion) + addFormatterSettings(formatter) formatter.run() val bootstrapMetadata = formatter.bootstrapMetadata() diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala index 3b7732374d5..cd22727839e 100644 --- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala +++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala @@ -67,4 +67,12 @@ object TestInfoUtils { def isTransactionV2Enabled(testInfo: TestInfo): Boolean = { !testInfo.getDisplayName.contains("isTV2Enabled=false") } + + /** + * Returns whether eligible leader replicas version 1 is enabled. + * When no parameter is provided, the default returned is false. + */ + def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = { + testInfo.getDisplayName.contains("isELRV1Enabled=true") + } } diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 4f442d3194e..296e285bdeb 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record.RecordVersion import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, TransactionVersion} +import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, TransactionVersion} import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag @@ -65,11 +65,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion ): Unit = { if (apiVersion >= 3) { - assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size()) + assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) - assertEquals(4, apiVersionsResponse.data().supportedFeatures().size()) + assertEquals(5, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) if (apiVersion < 4) { assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion()) @@ -83,6 +83,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion()) assertEquals(GroupVersion.GV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion()) + + assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion()) + assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { ApiVersionsResponse.collectApis( diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 5a213e6c186..b38a2178bae 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -336,7 +336,7 @@ Found problem: properties.putAll(defaultStaticQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) assertEquals("Unsupported feature: non.existent.feature. Supported features are: " + - "group.version, kraft.version, transaction.version", + "eligible.leader.replicas.version, group.version, kraft.version, transaction.version", assertThrows(classOf[FormatterException], () => runFormatCommand(new ByteArrayOutputStream(), properties, Seq("--feature", "non.existent.feature=20"))).getMessage) diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index f114d594ae5..436c9d868cf 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -396,6 +396,15 @@ public class FeatureControlManager { return new FinalizedControllerFeatures(features, epoch); } + FinalizedControllerFeatures latestFinalizedFeatures() { + Map features = new HashMap<>(); + features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel()); + for (Entry entry : finalizedVersions.entrySet()) { + features.put(entry.getKey(), entry.getValue()); + } + return new FinalizedControllerFeatures(features, -1); + } + public void replay(FeatureLevelRecord record) { VersionRange range = quorumFeatures.localSupportedFeature(record.name()); if (!range.contains(record.featureLevel())) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 71e43b55b25..bd98f7c8096 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -214,7 +214,6 @@ public final class QuorumController implements Controller { private Map staticConfig = Collections.emptyMap(); private BootstrapMetadata bootstrapMetadata = null; private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH; - private boolean eligibleLeaderReplicasEnabled = false; private DelegationTokenCache tokenCache; private String tokenSecretKeyString; private long delegationTokenMaxLifeMs; @@ -337,11 +336,6 @@ public final class QuorumController implements Controller { return this; } - public Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) { - this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled; - return this; - } - public Builder setDelegationTokenCache(DelegationTokenCache tokenCache) { this.tokenCache = tokenCache; return this; @@ -432,7 +426,6 @@ public final class QuorumController implements Controller { delegationTokenMaxLifeMs, delegationTokenExpiryTimeMs, delegationTokenExpiryCheckIntervalMs, - eligibleLeaderReplicasEnabled, uncleanLeaderElectionCheckIntervalMs, interBrokerListenerName ); @@ -1436,11 +1429,6 @@ public final class QuorumController implements Controller { */ private final BootstrapMetadata bootstrapMetadata; - /** - * True if the KIP-966 eligible leader replicas feature is enabled. - */ - private final boolean eligibleLeaderReplicasEnabled; - /** * The maximum number of records per batch to allow. */ @@ -1480,7 +1468,6 @@ public final class QuorumController implements Controller { long delegationTokenMaxLifeMs, long delegationTokenExpiryTimeMs, long delegationTokenExpiryCheckIntervalMs, - boolean eligibleLeaderReplicasEnabled, long uncleanLeaderElectionCheckIntervalMs, String interBrokerListenerName ) { @@ -1549,7 +1536,6 @@ public final class QuorumController implements Controller { setLogContext(logContext). setDefaultReplicationFactor(defaultReplicationFactor). setDefaultNumPartitions(defaultNumPartitions). - setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled). setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE). setConfigurationControl(configurationControl). setClusterControl(clusterControl). @@ -1580,7 +1566,6 @@ public final class QuorumController implements Controller { this.metaLogListener = new QuorumMetaLogListener(); this.curClaimEpoch = -1; this.recordRedactor = new RecordRedactor(configSchema); - this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled; if (maxIdleIntervalNs.isPresent()) { registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong()); } @@ -1599,10 +1584,7 @@ public final class QuorumController implements Controller { setMetrics(controllerMetrics). setTime(time). build(); - - log.info("Creating new QuorumController with clusterId {}.{}", - clusterId, - eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : ""); + log.info("Creating new QuorumController with clusterId {}", clusterId); this.raftClient.register(metaLogListener); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 16cc762ebc5..a995068eccd 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -95,6 +95,7 @@ import org.apache.kafka.metadata.placement.PlacementSpec; import org.apache.kafka.metadata.placement.TopicAssignment; import org.apache.kafka.metadata.placement.UsableBroker; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.TopicIdPartition; import org.apache.kafka.server.mutable.BoundedList; import org.apache.kafka.server.policy.CreateTopicPolicy; @@ -165,7 +166,6 @@ public class ReplicationControlManager { private ClusterControlManager clusterControl = null; private Optional createTopicPolicy = Optional.empty(); private FeatureControlManager featureControl = null; - private boolean eligibleLeaderReplicasEnabled = false; Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { this.snapshotRegistry = snapshotRegistry; @@ -187,11 +187,6 @@ public class ReplicationControlManager { return this; } - Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) { - this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled; - return this; - } - Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) { this.maxElectionsPerImbalance = maxElectionsPerImbalance; return this; @@ -233,7 +228,6 @@ public class ReplicationControlManager { defaultReplicationFactor, defaultNumPartitions, maxElectionsPerImbalance, - eligibleLeaderReplicasEnabled, configurationControl, clusterControl, createTopicPolicy, @@ -305,11 +299,6 @@ public class ReplicationControlManager { */ private final int defaultNumPartitions; - /** - * True if eligible leader replicas is enabled. - */ - private final boolean eligibleLeaderReplicasEnabled; - /** * Maximum number of leader elections to perform during one partition leader balancing operation. */ @@ -399,7 +388,6 @@ public class ReplicationControlManager { short defaultReplicationFactor, int defaultNumPartitions, int maxElectionsPerImbalance, - boolean eligibleLeaderReplicasEnabled, ConfigurationControlManager configurationControl, ClusterControlManager clusterControl, Optional createTopicPolicy, @@ -410,7 +398,6 @@ public class ReplicationControlManager { this.defaultReplicationFactor = defaultReplicationFactor; this.defaultNumPartitions = defaultNumPartitions; this.maxElectionsPerImbalance = maxElectionsPerImbalance; - this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled; this.configurationControl = configurationControl; this.createTopicPolicy = createTopicPolicy; this.featureControl = featureControl; @@ -1029,7 +1016,8 @@ public class ReplicationControlManager { } boolean isElrEnabled() { - return eligibleLeaderReplicasEnabled && featureControl.metadataVersion().isElrSupported(); + return featureControl.metadataVersion().isElrSupported() && featureControl.latestFinalizedFeatures(). + versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel(); } ControllerResult alterPartition( diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java index a8331fe9f23..2cca9ef7cc6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import org.slf4j.Logger; @@ -75,6 +76,32 @@ public class QuorumControllerIntegrationTestUtils { return features; } + /** + * Create a broker features collection for use in a registration request. MV and given features are included. + * + * @param minVersion The minimum supported MV. + * @param maxVersion The maximum supported MV. + * @param featureMaxVersions The features and their max supported versions. + */ + static BrokerRegistrationRequestData.FeatureCollection brokerFeaturesPlusFeatureVersions( + MetadataVersion minVersion, + MetadataVersion maxVersion, + Map featureMaxVersions + ) { + BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection(); + features.add(new BrokerRegistrationRequestData.Feature() + .setName(MetadataVersion.FEATURE_NAME) + .setMinSupportedVersion(minVersion.featureLevel()) + .setMaxSupportedVersion(maxVersion.featureLevel())); + featureMaxVersions.entrySet().forEach(entry -> { + features.add(new BrokerRegistrationRequestData.Feature() + .setName(entry.getKey()) + .setMaxSupportedVersion(entry.getValue()) + .setMinSupportedVersion((short) 0)); + }); + return features; + } + /** * Register the given number of brokers. * @@ -94,7 +121,8 @@ public class QuorumControllerIntegrationTestUtils { .setBrokerId(brokerId) .setRack(null) .setClusterId(controller.clusterId()) - .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())) + .setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(), + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))) .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId)) .setLogDirs(Collections.singletonList( Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA") diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 867957513b4..6e64483a311 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -106,6 +106,7 @@ import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; @@ -157,6 +158,7 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry; import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT; import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers; @@ -188,7 +190,8 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). + setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(), + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))). setClusterId(logEnv.clusterId())).get(); @@ -229,7 +232,8 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). + setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(), + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))). setClusterId(logEnv.clusterId())).get(); @@ -367,14 +371,16 @@ public class QuorumControllerTest { listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); QuorumController active = controlEnv.activeController(); Map brokerEpochs = new HashMap<>(); - + BrokerRegistrationRequestData.FeatureCollection features = + brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1, + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel())); for (Integer brokerId : allBrokers) { CompletableFuture reply = active.registerBroker( anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData(). setBrokerId(brokerId). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1)). + setFeatures(features). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). setListeners(listeners)); @@ -442,7 +448,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(brokerToUncleanShutdown). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1)). + setFeatures(features). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). setListeners(listeners)).get(); @@ -455,7 +461,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(lastKnownElr[0]). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV1)). + setFeatures(features). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). setListeners(listeners)).get(); @@ -737,7 +743,8 @@ public class QuorumControllerTest { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). + setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(), + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))). setListeners(listeners)); assertEquals(5L, reply.get().epoch()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index 8bece2bb86c..70ae12c96cd 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -17,10 +17,12 @@ package org.apache.kafka.controller; +import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.metadata.FakeKafkaConfigSchema; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.test.TestUtils; @@ -115,11 +117,17 @@ public class QuorumControllerTestEnv implements AutoCloseable { fatalFaultHandlers.put(nodeId, fatalFaultHandler); MockFaultHandler nonFatalFaultHandler = new MockFaultHandler("nonFatalFaultHandler"); builder.setNonFatalFaultHandler(nonFatalFaultHandler); - builder.setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled); builder.setConfigSchema(FakeKafkaConfigSchema.INSTANCE); nonFatalFaultHandlers.put(nodeId, fatalFaultHandler); controllerBuilderInitializer.accept(builder); - this.controllers.add(builder.build()); + QuorumController controller = builder.build(); + if (eligibleLeaderReplicasEnabled) { + controller.featureControl().replay(new FeatureLevelRecord() + .setName(EligibleLeaderReplicasVersion.FEATURE_NAME) + .setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()) + ); + } + this.controllers.add(controller); } } catch (Exception e) { close(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index c53f068e9d6..55ed04c15a7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -63,6 +63,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.On import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -92,6 +93,7 @@ import org.apache.kafka.metadata.Replicas; import org.apache.kafka.metadata.placement.StripedReplicaPlacer; import org.apache.kafka.metadata.placement.UsableBroker; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TopicIdPartition; import org.apache.kafka.server.policy.CreateTopicPolicy; @@ -218,6 +220,7 @@ public class ReplicationControlManagerTest { final ClusterControlManager clusterControl; final ConfigurationControlManager configurationControl; final ReplicationControlManager replicationControl; + final OffsetControlManager offsetControlManager; void replay(List records) { RecordTestUtils.replayAll(clusterControl, records); @@ -245,6 +248,12 @@ public class ReplicationControlManagerTest { Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); + featureControl.replay(new FeatureLevelRecord() + .setName(EligibleLeaderReplicasVersion.FEATURE_NAME) + .setFeatureLevel(isElrEnabled ? + EligibleLeaderReplicasVersion.ELRV_1.featureLevel() : + EligibleLeaderReplicasVersion.ELRV_0.featureLevel()) + ); this.clusterControl = new ClusterControlManager.Builder(). setLogContext(logContext). setTime(time). @@ -254,7 +263,9 @@ public class ReplicationControlManagerTest { setFeatureControlManager(featureControl). setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown). build(); - + this.offsetControlManager = new OffsetControlManager.Builder(). + setSnapshotRegistry(snapshotRegistry). + build(); this.replicationControl = new ReplicationControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setLogContext(logContext). @@ -263,7 +274,6 @@ public class ReplicationControlManagerTest { setClusterControl(clusterControl). setCreateTopicPolicy(createTopicPolicy). setFeatureControl(featureControl). - setEligibleLeaderReplicasEnabled(isElrEnabled). build(); clusterControl.activate(); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index c0d9cd4ee95..39f5925a0d2 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -360,7 +360,8 @@ public class FormatterTest { formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); assertEquals("Unsupported feature: nonexistent.feature. Supported features " + - "are: group.version, kraft.version, test.feature.version, transaction.version", + "are: eligible.leader.replicas.version, group.version, kraft.version, " + + "test.feature.version, transaction.version", assertThrows(FormatterException.class, () -> formatter1.formatter.run()). getMessage()); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java new file mode 100644 index 00000000000..fd10690a663 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum EligibleLeaderReplicasVersion implements FeatureVersion { + + // Version 0 is the version disable ELR. + ELRV_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + + // Version 1 enables the ELR (KIP-966). + ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap()); + + public static final String FEATURE_NAME = "eligible.leader.replicas.version"; + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + EligibleLeaderReplicasVersion( + int featureLevel, + MetadataVersion bootstrapMetadataVersion, + Map dependencies + ) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + + public boolean isEligibleLeaderReplicasFeatureEnabeld() { + return featureLevel >= ELRV_1.featureLevel; + } + + public static EligibleLeaderReplicasVersion fromFeatureLevel(short version) { + switch (version) { + case 0: + return ELRV_0; + case 1: + return ELRV_1; + default: + throw new RuntimeException("Unknown eligible leader replicas feature level: " + (int) version); + } + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Features.java b/server-common/src/main/java/org/apache/kafka/server/common/Features.java index 51f3d78e868..bd4fa0c8615 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java @@ -44,7 +44,8 @@ public enum Features { TEST_VERSION("test.feature.version", TestFeatureVersion.values()), KRAFT_VERSION("kraft.version", KRaftVersion.values()), TRANSACTION_VERSION("transaction.version", TransactionVersion.values()), - GROUP_VERSION("group.version", GroupVersion.values()); + GROUP_VERSION("group.version", GroupVersion.values()), + ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.values()); public static final Features[] FEATURES; public static final List PRODUCTION_FEATURES; diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java index a29c71ad4be..d2cf35f33a5 100644 --- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java @@ -123,9 +123,6 @@ public class KRaftConfigs { public static final int MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT = 200; public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_DOC = "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft"; - /** Enable eligible leader replicas configs */ - public static final String ELR_ENABLED_CONFIG = "eligible.leader.replicas.enable"; - public static final String ELR_ENABLED_DOC = "Enable the Eligible leader replicas"; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC) .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC) @@ -145,7 +142,6 @@ public class KRaftConfigs { .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC) .defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC) .define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, MIGRATION_ENABLED_DOC) - .define(ELR_ENABLED_CONFIG, BOOLEAN, false, HIGH, ELR_ENABLED_DOC) .defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1), MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC); } diff --git a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java index 6ce2b3a7e65..8963b40fceb 100644 --- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java +++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.server.common.Features.ELIGIBLE_LEADER_REPLICAS_VERSION; import static org.apache.kafka.server.common.Features.GROUP_VERSION; import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -111,6 +112,7 @@ public class BrokerFeaturesTest { expectedFeatures.put(MetadataVersion.FEATURE_NAME, MetadataVersion.latestTesting().featureLevel()); expectedFeatures.put(TRANSACTION_VERSION.featureName(), TRANSACTION_VERSION.latestTesting()); expectedFeatures.put(GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting()); + expectedFeatures.put(ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting()); expectedFeatures.put("kraft.version", (short) 0); expectedFeatures.put("test_feature_1", (short) 4); expectedFeatures.put("test_feature_2", (short) 3); diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index 452f74d5c19..1aafc7db994 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -60,14 +60,16 @@ public class FeatureCommandTest { List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) - assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + + assertEquals("Feature: eligible.leader.replicas.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); - assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + + assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(1))); + assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2))); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(2))); + "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(3))); assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(3))); + "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); } // Use the first MetadataVersion that supports KIP-919 @@ -80,14 +82,16 @@ public class FeatureCommandTest { List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) - assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + + assertEquals("Feature: eligible.leader.replicas.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); - assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + + assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(1))); + assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2))); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(2))); + "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3))); assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(3))); + "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); } @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1) @@ -172,7 +176,8 @@ public class FeatureCommandTest { "downgrade", "--release-version", "3.7-IV3")) ); - assertEquals("group.version was downgraded to 0.\n" + + assertEquals("eligible.leader.replicas.version was downgraded to 0.\n" + + "group.version was downgraded to 0.\n" + "kraft.version was downgraded to 0.\n" + "metadata.version was downgraded to 18.\n" + "transaction.version was downgraded to 0.", commandOutput);