KAFKA-18634: Fix ELR metadata version issues (#18680)

This patch cleans up the places that should not use MV to determine ELR is enabled marks 4.0IV1 stable.

Reviewers: Alyssa Huang <ahuang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Calvin Liu 2025-02-13 23:40:31 -08:00 committed by GitHub
parent 2bbd25841e
commit e7a2af8414
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 131 additions and 15 deletions

View File

@ -849,6 +849,7 @@ public class ReplicationControlManager {
PartitionRegistration info = partEntry.getValue(); PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder(). records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()). setMetadataVersion(featureControl.metadataVersion()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build())); build()));
} }
return ApiError.NONE; return ApiError.NONE;
@ -1468,7 +1469,7 @@ public class ReplicationControlManager {
* @param records The record list to append to. * @param records The record list to append to.
*/ */
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) { void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) { if (featureControl.isElrFeatureEnabled() && !isCleanShutdown) {
// ELR is enabled, generate unclean shutdown partition change records // ELR is enabled, generate unclean shutdown partition change records
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
@ -1942,6 +1943,7 @@ public class ReplicationControlManager {
records.add(buildPartitionRegistration(partitionAssignment, isr) records.add(buildPartitionRegistration(partitionAssignment, isr)
.toRecord(topicId, partitionId, new ImageWriterOptions.Builder(). .toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()). setMetadataVersion(featureControl.metadataVersion()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build())); build()));
partitionId++; partitionId++;
} }

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.node.FeaturesImageNode; import org.apache.kafka.image.node.FeaturesImageNode;
import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList; import java.util.ArrayList;
@ -67,6 +68,11 @@ public final class FeaturesImage {
return finalizedVersions; return finalizedVersions;
} }
public boolean isElrEnabled() {
return finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_0.featureLevel())
>= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
private Optional<Short> finalizedVersion(String feature) { private Optional<Short> finalizedVersion(String feature) {
return Optional.ofNullable(finalizedVersions.get(feature)); return Optional.ofNullable(finalizedVersions.get(feature));
} }

View File

@ -295,6 +295,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
ImageReWriter writer = new ImageReWriter(delta); ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new ImageWriterOptions.Builder(). image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()). setMetadataVersion(image.features().metadataVersion()).
setEligibleLeaderReplicasEnabled(image.features().isElrEnabled()).
build()); build());
// ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here. // ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here.
SnapshotManifest manifest = new SnapshotManifest( SnapshotManifest manifest = new SnapshotManifest(

View File

@ -148,6 +148,7 @@ public class SnapshotEmitter implements SnapshotGenerator.Emitter {
try { try {
image.write(writer, new ImageWriterOptions.Builder(). image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()). setMetadataVersion(image.features().metadataVersion()).
setEligibleLeaderReplicasEnabled(image.features().isElrEnabled()).
build()); build());
writer.close(true); writer.close(true);
metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds()); metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds());

View File

@ -30,6 +30,7 @@ public final class ImageWriterOptions {
public static class Builder { public static class Builder {
private MetadataVersion metadataVersion; private MetadataVersion metadataVersion;
private MetadataVersion requestedMetadataVersion; private MetadataVersion requestedMetadataVersion;
private boolean isEligibleLeaderReplicasEnabled = false;
private Consumer<UnwritableMetadataException> lossHandler = e -> { private Consumer<UnwritableMetadataException> lossHandler = e -> {
throw e; throw e;
}; };
@ -40,6 +41,7 @@ public final class ImageWriterOptions {
public Builder(MetadataImage image) { public Builder(MetadataImage image) {
this.metadataVersion = image.features().metadataVersion(); this.metadataVersion = image.features().metadataVersion();
this.isEligibleLeaderReplicasEnabled = image.features().isElrEnabled();
} }
public Builder setMetadataVersion(MetadataVersion metadataVersion) { public Builder setMetadataVersion(MetadataVersion metadataVersion) {
@ -54,6 +56,11 @@ public final class ImageWriterOptions {
return this; return this;
} }
public Builder setEligibleLeaderReplicasEnabled(boolean isEligibleLeaderReplicasEnabled) {
this.isEligibleLeaderReplicasEnabled = isEligibleLeaderReplicasEnabled;
return this;
}
public MetadataVersion metadataVersion() { public MetadataVersion metadataVersion() {
return metadataVersion; return metadataVersion;
} }
@ -62,33 +69,43 @@ public final class ImageWriterOptions {
return requestedMetadataVersion; return requestedMetadataVersion;
} }
public boolean isEligibleLeaderReplicasEnabled() {
return isEligibleLeaderReplicasEnabled;
}
public Builder setLossHandler(Consumer<UnwritableMetadataException> lossHandler) { public Builder setLossHandler(Consumer<UnwritableMetadataException> lossHandler) {
this.lossHandler = lossHandler; this.lossHandler = lossHandler;
return this; return this;
} }
public ImageWriterOptions build() { public ImageWriterOptions build() {
return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion); return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion, isEligibleLeaderReplicasEnabled);
} }
} }
private final MetadataVersion metadataVersion; private final MetadataVersion metadataVersion;
private final MetadataVersion requestedMetadataVersion; private final MetadataVersion requestedMetadataVersion;
private final Consumer<UnwritableMetadataException> lossHandler; private final Consumer<UnwritableMetadataException> lossHandler;
private final boolean isEligibleLeaderReplicasEnabled;
private ImageWriterOptions( private ImageWriterOptions(
MetadataVersion metadataVersion, MetadataVersion metadataVersion,
Consumer<UnwritableMetadataException> lossHandler, Consumer<UnwritableMetadataException> lossHandler,
MetadataVersion orgMetadataVersion MetadataVersion orgMetadataVersion,
boolean isEligibleLeaderReplicasEnabled
) { ) {
this.metadataVersion = metadataVersion; this.metadataVersion = metadataVersion;
this.lossHandler = lossHandler; this.lossHandler = lossHandler;
this.requestedMetadataVersion = orgMetadataVersion; this.requestedMetadataVersion = orgMetadataVersion;
this.isEligibleLeaderReplicasEnabled = isEligibleLeaderReplicasEnabled;
} }
public MetadataVersion metadataVersion() { public MetadataVersion metadataVersion() {
return metadataVersion; return metadataVersion;
} }
public boolean isEligibleLeaderReplicasEnabled() {
return isEligibleLeaderReplicasEnabled;
}
public void handleLoss(String loss) { public void handleLoss(String loss) {
lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss)); lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss));

View File

@ -387,12 +387,17 @@ public class PartitionRegistration {
setLeaderRecoveryState(leaderRecoveryState.value()). setLeaderRecoveryState(leaderRecoveryState.value()).
setLeaderEpoch(leaderEpoch). setLeaderEpoch(leaderEpoch).
setPartitionEpoch(partitionEpoch); setPartitionEpoch(partitionEpoch);
if (options.metadataVersion().isElrSupported()) { if (options.isEligibleLeaderReplicasEnabled()) {
// The following are tagged fields, we should only set them when there are some contents, in order to save // The following are tagged fields, we should only set them when there are some contents, in order to save
// spaces. // spaces.
if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr)); if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr));
if (lastKnownElr.length > 0) record.setLastKnownElr(Replicas.toList(lastKnownElr)); if (lastKnownElr.length > 0) record.setLastKnownElr(Replicas.toList(lastKnownElr));
} }
if (options.metadataVersion() == null) {
options.handleLoss("the metadata version");
return new ApiMessageAndVersion(record, (short) 0);
}
if (options.metadataVersion().isDirectoryAssignmentSupported()) { if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories)); record.setDirectories(Uuid.toList(directories));
} else { } else {

View File

@ -22,6 +22,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -166,4 +167,19 @@ public class FeaturesImageTest {
MetadataVersion.IBP_3_3_IV0).isEmpty()); MetadataVersion.IBP_3_3_IV0).isEmpty());
assertTrue(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(), FeaturesImage.EMPTY.metadataVersion()).isEmpty()); assertTrue(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(), FeaturesImage.EMPTY.metadataVersion()).isEmpty());
} }
@Test
public void testElrEnabled() {
FeaturesImage image1 = new FeaturesImage(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_0.featureLevel()),
MetadataVersion.latestTesting()
);
assertFalse(image1.isElrEnabled());
FeaturesImage image2 = new FeaturesImage(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
MetadataVersion.latestTesting()
);
assertTrue(image2.isElrEnabled());
}
} }

View File

@ -17,10 +17,26 @@
package org.apache.kafka.image.writer; package org.apache.kafka.image.writer;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.AclsImageTest;
import org.apache.kafka.image.ClientQuotasImageTest;
import org.apache.kafka.image.ClusterImageTest;
import org.apache.kafka.image.ConfigurationsImageTest;
import org.apache.kafka.image.DelegationTokenImageTest;
import org.apache.kafka.image.FeaturesDelta;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.ProducerIdsImageTest;
import org.apache.kafka.image.ScramImageTest;
import org.apache.kafka.image.TopicsImageTest;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -71,4 +87,53 @@ public class ImageWriterOptionsTest {
options.handleLoss(expectedMessage); options.handleLoss(expectedMessage);
} }
} }
@Test
public void testSetEligibleLeaderReplicasEnabled() {
MetadataVersion version = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(version).
setEligibleLeaderReplicasEnabled(true).build();
assertEquals(true, options.isEligibleLeaderReplicasEnabled());
options = new ImageWriterOptions.Builder().
setMetadataVersion(version).build();
assertEquals(false, options.isEligibleLeaderReplicasEnabled());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConstructionWithImage(boolean isElrEnabled) {
FeaturesDelta featuresDelta = new FeaturesDelta(FeaturesImage.EMPTY);
featuresDelta.replay(new FeatureLevelRecord().
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
setFeatureLevel(isElrEnabled ?
EligibleLeaderReplicasVersion.ELRV_1.featureLevel() : EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
)
);
featuresDelta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel())
);
MetadataImage metadataImage = new MetadataImage(
new MetadataProvenance(100, 4, 2000, true),
featuresDelta.apply(),
ClusterImageTest.IMAGE1,
TopicsImageTest.IMAGE1,
ConfigurationsImageTest.IMAGE1,
ClientQuotasImageTest.IMAGE1,
ProducerIdsImageTest.IMAGE1,
AclsImageTest.IMAGE1,
ScramImageTest.IMAGE1,
DelegationTokenImageTest.IMAGE1
);
ImageWriterOptions options = new ImageWriterOptions.Builder(metadataImage).build();
assertEquals(MetadataVersion.IBP_4_0_IV1, options.metadataVersion());
if (isElrEnabled) {
assertEquals(true, options.isEligibleLeaderReplicasEnabled());
} else {
assertEquals(false, options.isEligibleLeaderReplicasEnabled());
}
}
} }

View File

@ -338,6 +338,7 @@ public class PartitionRegistrationTest {
List<UnwritableMetadataException> exceptions = new ArrayList<>(); List<UnwritableMetadataException> exceptions = new ArrayList<>();
ImageWriterOptions options = new ImageWriterOptions.Builder(). ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion). setMetadataVersion(metadataVersion).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
setLossHandler(exceptions::add). setLossHandler(exceptions::add).
build(); build();
assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()), assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()),

View File

@ -29,6 +29,7 @@ import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.GroupVersion; import org.apache.kafka.server.common.GroupVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
@ -341,6 +342,9 @@ public class FormatterTest {
setName(MetadataVersion.FEATURE_NAME). setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
(short) 0)); (short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(GroupVersion.FEATURE_NAME). setName(GroupVersion.FEATURE_NAME).
setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0)); setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));

View File

@ -65,10 +65,6 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion {
return dependencies; return dependencies;
} }
public boolean isEligibleLeaderReplicasFeatureEnabeld() {
return featureLevel >= ELRV_1.featureLevel;
}
public static EligibleLeaderReplicasVersion fromFeatureLevel(short version) { public static EligibleLeaderReplicasVersion fromFeatureLevel(short version) {
switch (version) { switch (version) {
case 0: case 0:

View File

@ -113,15 +113,17 @@ public enum MetadataVersion {
// Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848). // Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848).
IBP_4_0_IV0(22, "4.0", "IV0", false), IBP_4_0_IV0(22, "4.0", "IV0", false),
// Add ELR related supports (KIP-966).
// PartitionRecord and PartitionChangeRecord are updated.
// ClearElrRecord is added.
IBP_4_0_IV1(23, "4.0", "IV1", true),
// //
// NOTE: MetadataVersions after this point are unstable and may be changed. // NOTE: MetadataVersions after this point are unstable and may be changed.
// If users attempt to use an unstable MetadataVersion, they will get an error. // If users attempt to use an unstable MetadataVersion, they will get an error.
// Please move this comment when updating the LATEST_PRODUCTION constant. // Please move this comment when updating the LATEST_PRODUCTION constant.
// //
// Add ELR related supports (KIP-966).
IBP_4_0_IV1(23, "4.0", "IV1", true),
// Bootstrap metadata version for transaction versions 1 and 2 (KIP-890) // Bootstrap metadata version for transaction versions 1 and 2 (KIP-890)
IBP_4_0_IV2(24, "4.0", "IV2", false), IBP_4_0_IV2(24, "4.0", "IV2", false),
@ -152,7 +154,7 @@ public enum MetadataVersion {
* <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION,
* IT CANNOT BE CHANGED.</strong> * IT CANNOT BE CHANGED.</strong>
*/ */
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV0; public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV1;
// If you change the value above please also update // If you change the value above please also update
// LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py // LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py

View File

@ -114,7 +114,7 @@ DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT")
LATEST_STABLE_TRANSACTION_VERSION = 2 LATEST_STABLE_TRANSACTION_VERSION = 2
# This should match the LATEST_PRODUCTION version defined in MetadataVersion.java # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java
LATEST_STABLE_METADATA_VERSION = "4.0-IV0" LATEST_STABLE_METADATA_VERSION = "4.0-IV1"
# 0.11.0.x versions # 0.11.0.x versions
V_0_11_0_3 = KafkaVersion("0.11.0.3") V_0_11_0_3 = KafkaVersion("0.11.0.3")