diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 4c84f53f779..cbb8c6e1b9d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -848,8 +848,9 @@ public class ReplicationControlManager { int partitionIndex = partEntry.getKey(); PartitionRegistration info = partEntry.getValue(); records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder(). - setMetadataVersion(featureControl.metadataVersion()). - build())); + setMetadataVersion(featureControl.metadataVersion()). + setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()). + build())); } return ApiError.NONE; } @@ -1468,7 +1469,7 @@ public class ReplicationControlManager { * @param records The record list to append to. */ void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List records) { - if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) { + if (featureControl.isElrFeatureEnabled() && !isCleanShutdown) { // ELR is enabled, generate unclean shutdown partition change records generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); @@ -1942,6 +1943,7 @@ public class ReplicationControlManager { records.add(buildPartitionRegistration(partitionAssignment, isr) .toRecord(topicId, partitionId, new ImageWriterOptions.Builder(). setMetadataVersion(featureControl.metadataVersion()). + setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()). build())); partitionId++; } diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java index eba2d3c26af..b4a7c4f30e2 100644 --- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.image.node.FeaturesImageNode; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import java.util.ArrayList; @@ -67,6 +68,11 @@ public final class FeaturesImage { return finalizedVersions; } + public boolean isElrEnabled() { + return finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_0.featureLevel()) + >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel(); + } + private Optional finalizedVersion(String feature) { return Optional.ofNullable(finalizedVersions.get(feature)); } diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index 56f5f20268b..703123fe395 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -295,6 +295,7 @@ public class MetadataLoader implements RaftClient.Listener ImageReWriter writer = new ImageReWriter(delta); image.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(image.features().metadataVersion()). + setEligibleLeaderReplicasEnabled(image.features().isElrEnabled()). build()); // ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here. SnapshotManifest manifest = new SnapshotManifest( diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java index c3e08caf9f2..18d7be0ea6d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java @@ -148,6 +148,7 @@ public class SnapshotEmitter implements SnapshotGenerator.Emitter { try { image.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(image.features().metadataVersion()). + setEligibleLeaderReplicasEnabled(image.features().isElrEnabled()). build()); writer.close(true); metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds()); diff --git a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java index f84fe1e66d6..457229d6364 100644 --- a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java +++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java @@ -30,6 +30,7 @@ public final class ImageWriterOptions { public static class Builder { private MetadataVersion metadataVersion; private MetadataVersion requestedMetadataVersion; + private boolean isEligibleLeaderReplicasEnabled = false; private Consumer lossHandler = e -> { throw e; }; @@ -40,6 +41,7 @@ public final class ImageWriterOptions { public Builder(MetadataImage image) { this.metadataVersion = image.features().metadataVersion(); + this.isEligibleLeaderReplicasEnabled = image.features().isElrEnabled(); } public Builder setMetadataVersion(MetadataVersion metadataVersion) { @@ -54,6 +56,11 @@ public final class ImageWriterOptions { return this; } + public Builder setEligibleLeaderReplicasEnabled(boolean isEligibleLeaderReplicasEnabled) { + this.isEligibleLeaderReplicasEnabled = isEligibleLeaderReplicasEnabled; + return this; + } + public MetadataVersion metadataVersion() { return metadataVersion; } @@ -62,33 +69,43 @@ public final class ImageWriterOptions { return requestedMetadataVersion; } + public boolean isEligibleLeaderReplicasEnabled() { + return isEligibleLeaderReplicasEnabled; + } + public Builder setLossHandler(Consumer lossHandler) { this.lossHandler = lossHandler; return this; } public ImageWriterOptions build() { - return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion); + return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion, isEligibleLeaderReplicasEnabled); } } private final MetadataVersion metadataVersion; private final MetadataVersion requestedMetadataVersion; private final Consumer lossHandler; + private final boolean isEligibleLeaderReplicasEnabled; private ImageWriterOptions( MetadataVersion metadataVersion, Consumer lossHandler, - MetadataVersion orgMetadataVersion + MetadataVersion orgMetadataVersion, + boolean isEligibleLeaderReplicasEnabled ) { this.metadataVersion = metadataVersion; this.lossHandler = lossHandler; this.requestedMetadataVersion = orgMetadataVersion; + this.isEligibleLeaderReplicasEnabled = isEligibleLeaderReplicasEnabled; } public MetadataVersion metadataVersion() { return metadataVersion; } + public boolean isEligibleLeaderReplicasEnabled() { + return isEligibleLeaderReplicasEnabled; + } public void handleLoss(String loss) { lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss)); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index 99a877a7953..76f4fafe605 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -387,12 +387,17 @@ public class PartitionRegistration { setLeaderRecoveryState(leaderRecoveryState.value()). setLeaderEpoch(leaderEpoch). setPartitionEpoch(partitionEpoch); - if (options.metadataVersion().isElrSupported()) { + if (options.isEligibleLeaderReplicasEnabled()) { // The following are tagged fields, we should only set them when there are some contents, in order to save // spaces. if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr)); if (lastKnownElr.length > 0) record.setLastKnownElr(Replicas.toList(lastKnownElr)); } + + if (options.metadataVersion() == null) { + options.handleLoss("the metadata version"); + return new ApiMessageAndVersion(record, (short) 0); + } if (options.metadataVersion().isDirectoryAssignmentSupported()) { record.setDirectories(Uuid.toList(directories)); } else { diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java index 1df5ff65563..cb83467e7fb 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; @@ -166,4 +167,19 @@ public class FeaturesImageTest { MetadataVersion.IBP_3_3_IV0).isEmpty()); assertTrue(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(), FeaturesImage.EMPTY.metadataVersion()).isEmpty()); } + + @Test + public void testElrEnabled() { + FeaturesImage image1 = new FeaturesImage( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_0.featureLevel()), + MetadataVersion.latestTesting() + ); + assertFalse(image1.isElrEnabled()); + + FeaturesImage image2 = new FeaturesImage( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), + MetadataVersion.latestTesting() + ); + assertTrue(image2.isElrEnabled()); + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java index c1c119f9b4c..8ed78258f93 100644 --- a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java @@ -17,10 +17,26 @@ package org.apache.kafka.image.writer; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.image.AclsImageTest; +import org.apache.kafka.image.ClientQuotasImageTest; +import org.apache.kafka.image.ClusterImageTest; +import org.apache.kafka.image.ConfigurationsImageTest; +import org.apache.kafka.image.DelegationTokenImageTest; +import org.apache.kafka.image.FeaturesDelta; +import org.apache.kafka.image.FeaturesImage; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.ProducerIdsImageTest; +import org.apache.kafka.image.ScramImageTest; +import org.apache.kafka.image.TopicsImageTest; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.function.Consumer; @@ -71,4 +87,53 @@ public class ImageWriterOptionsTest { options.handleLoss(expectedMessage); } } + + @Test + public void testSetEligibleLeaderReplicasEnabled() { + MetadataVersion version = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION; + ImageWriterOptions options = new ImageWriterOptions.Builder(). + setMetadataVersion(version). + setEligibleLeaderReplicasEnabled(true).build(); + assertEquals(true, options.isEligibleLeaderReplicasEnabled()); + + options = new ImageWriterOptions.Builder(). + setMetadataVersion(version).build(); + assertEquals(false, options.isEligibleLeaderReplicasEnabled()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConstructionWithImage(boolean isElrEnabled) { + FeaturesDelta featuresDelta = new FeaturesDelta(FeaturesImage.EMPTY); + featuresDelta.replay(new FeatureLevelRecord(). + setName(EligibleLeaderReplicasVersion.FEATURE_NAME). + setFeatureLevel(isElrEnabled ? + EligibleLeaderReplicasVersion.ELRV_1.featureLevel() : EligibleLeaderReplicasVersion.ELRV_0.featureLevel() + ) + ); + featuresDelta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel()) + ); + MetadataImage metadataImage = new MetadataImage( + new MetadataProvenance(100, 4, 2000, true), + featuresDelta.apply(), + ClusterImageTest.IMAGE1, + TopicsImageTest.IMAGE1, + ConfigurationsImageTest.IMAGE1, + ClientQuotasImageTest.IMAGE1, + ProducerIdsImageTest.IMAGE1, + AclsImageTest.IMAGE1, + ScramImageTest.IMAGE1, + DelegationTokenImageTest.IMAGE1 + ); + + ImageWriterOptions options = new ImageWriterOptions.Builder(metadataImage).build(); + assertEquals(MetadataVersion.IBP_4_0_IV1, options.metadataVersion()); + if (isElrEnabled) { + assertEquals(true, options.isEligibleLeaderReplicasEnabled()); + } else { + assertEquals(false, options.isEligibleLeaderReplicasEnabled()); + } + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index c7d1767889b..2ef04c83f01 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -338,6 +338,7 @@ public class PartitionRegistrationTest { List exceptions = new ArrayList<>(); ImageWriterOptions options = new ImageWriterOptions.Builder(). setMetadataVersion(metadataVersion). + setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()). setLossHandler(exceptions::add). build(); assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index fd0a4086add..2d198f278d5 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.GroupVersion; import org.apache.kafka.server.common.MetadataVersion; @@ -341,6 +342,9 @@ public class FormatterTest { setName(MetadataVersion.FEATURE_NAME). setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0)); + expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(EligibleLeaderReplicasVersion.FEATURE_NAME). + setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0)); expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). setName(GroupVersion.FEATURE_NAME). setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0)); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java index 84a80c82d45..ae3dab0fb31 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java @@ -65,10 +65,6 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion { return dependencies; } - public boolean isEligibleLeaderReplicasFeatureEnabeld() { - return featureLevel >= ELRV_1.featureLevel; - } - public static EligibleLeaderReplicasVersion fromFeatureLevel(short version) { switch (version) { case 0: diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 310a9242b23..f632862dd8f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -113,15 +113,17 @@ public enum MetadataVersion { // Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848). IBP_4_0_IV0(22, "4.0", "IV0", false), + // Add ELR related supports (KIP-966). + // PartitionRecord and PartitionChangeRecord are updated. + // ClearElrRecord is added. + IBP_4_0_IV1(23, "4.0", "IV1", true), + // // NOTE: MetadataVersions after this point are unstable and may be changed. // If users attempt to use an unstable MetadataVersion, they will get an error. // Please move this comment when updating the LATEST_PRODUCTION constant. // - // Add ELR related supports (KIP-966). - IBP_4_0_IV1(23, "4.0", "IV1", true), - // Bootstrap metadata version for transaction versions 1 and 2 (KIP-890) IBP_4_0_IV2(24, "4.0", "IV2", false), @@ -152,7 +154,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ - public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV0; + public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV1; // If you change the value above please also update // LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 6eeeedd4452..7e212d60010 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -114,7 +114,7 @@ DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT") LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java -LATEST_STABLE_METADATA_VERSION = "4.0-IV0" +LATEST_STABLE_METADATA_VERSION = "4.0-IV1" # 0.11.0.x versions V_0_11_0_3 = KafkaVersion("0.11.0.3")