KAFKA-18634: Fix ELR metadata version issues (#18680)

This patch cleans up the places that should not use MV to determine ELR is enabled marks 4.0IV1 stable.

Reviewers: Alyssa Huang <ahuang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Calvin Liu 2025-02-13 23:40:31 -08:00 committed by David Jacot
parent 91958fce6a
commit ba067caa54
13 changed files with 131 additions and 15 deletions

View File

@ -849,6 +849,7 @@ public class ReplicationControlManager {
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build()));
}
return ApiError.NONE;
@ -1468,7 +1469,7 @@ public class ReplicationControlManager {
* @param records The record list to append to.
*/
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) {
if (featureControl.isElrFeatureEnabled() && !isCleanShutdown) {
// ELR is enabled, generate unclean shutdown partition change records
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
@ -1942,6 +1943,7 @@ public class ReplicationControlManager {
records.add(buildPartitionRegistration(partitionAssignment, isr)
.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build()));
partitionId++;
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.node.FeaturesImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
@ -67,6 +68,11 @@ public final class FeaturesImage {
return finalizedVersions;
}
public boolean isElrEnabled() {
return finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_0.featureLevel())
>= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
private Optional<Short> finalizedVersion(String feature) {
return Optional.ofNullable(finalizedVersions.get(feature));
}

View File

@ -295,6 +295,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
setEligibleLeaderReplicasEnabled(image.features().isElrEnabled()).
build());
// ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here.
SnapshotManifest manifest = new SnapshotManifest(

View File

@ -148,6 +148,7 @@ public class SnapshotEmitter implements SnapshotGenerator.Emitter {
try {
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
setEligibleLeaderReplicasEnabled(image.features().isElrEnabled()).
build());
writer.close(true);
metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds());

View File

@ -30,6 +30,7 @@ public final class ImageWriterOptions {
public static class Builder {
private MetadataVersion metadataVersion;
private MetadataVersion requestedMetadataVersion;
private boolean isEligibleLeaderReplicasEnabled = false;
private Consumer<UnwritableMetadataException> lossHandler = e -> {
throw e;
};
@ -40,6 +41,7 @@ public final class ImageWriterOptions {
public Builder(MetadataImage image) {
this.metadataVersion = image.features().metadataVersion();
this.isEligibleLeaderReplicasEnabled = image.features().isElrEnabled();
}
public Builder setMetadataVersion(MetadataVersion metadataVersion) {
@ -54,6 +56,11 @@ public final class ImageWriterOptions {
return this;
}
public Builder setEligibleLeaderReplicasEnabled(boolean isEligibleLeaderReplicasEnabled) {
this.isEligibleLeaderReplicasEnabled = isEligibleLeaderReplicasEnabled;
return this;
}
public MetadataVersion metadataVersion() {
return metadataVersion;
}
@ -62,33 +69,43 @@ public final class ImageWriterOptions {
return requestedMetadataVersion;
}
public boolean isEligibleLeaderReplicasEnabled() {
return isEligibleLeaderReplicasEnabled;
}
public Builder setLossHandler(Consumer<UnwritableMetadataException> lossHandler) {
this.lossHandler = lossHandler;
return this;
}
public ImageWriterOptions build() {
return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion);
return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion, isEligibleLeaderReplicasEnabled);
}
}
private final MetadataVersion metadataVersion;
private final MetadataVersion requestedMetadataVersion;
private final Consumer<UnwritableMetadataException> lossHandler;
private final boolean isEligibleLeaderReplicasEnabled;
private ImageWriterOptions(
MetadataVersion metadataVersion,
Consumer<UnwritableMetadataException> lossHandler,
MetadataVersion orgMetadataVersion
MetadataVersion orgMetadataVersion,
boolean isEligibleLeaderReplicasEnabled
) {
this.metadataVersion = metadataVersion;
this.lossHandler = lossHandler;
this.requestedMetadataVersion = orgMetadataVersion;
this.isEligibleLeaderReplicasEnabled = isEligibleLeaderReplicasEnabled;
}
public MetadataVersion metadataVersion() {
return metadataVersion;
}
public boolean isEligibleLeaderReplicasEnabled() {
return isEligibleLeaderReplicasEnabled;
}
public void handleLoss(String loss) {
lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss));

View File

@ -387,12 +387,17 @@ public class PartitionRegistration {
setLeaderRecoveryState(leaderRecoveryState.value()).
setLeaderEpoch(leaderEpoch).
setPartitionEpoch(partitionEpoch);
if (options.metadataVersion().isElrSupported()) {
if (options.isEligibleLeaderReplicasEnabled()) {
// The following are tagged fields, we should only set them when there are some contents, in order to save
// spaces.
if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr));
if (lastKnownElr.length > 0) record.setLastKnownElr(Replicas.toList(lastKnownElr));
}
if (options.metadataVersion() == null) {
options.handleLoss("the metadata version");
return new ApiMessageAndVersion(record, (short) 0);
}
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {

View File

@ -22,6 +22,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
@ -166,4 +167,19 @@ public class FeaturesImageTest {
MetadataVersion.IBP_3_3_IV0).isEmpty());
assertTrue(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(), FeaturesImage.EMPTY.metadataVersion()).isEmpty());
}
@Test
public void testElrEnabled() {
FeaturesImage image1 = new FeaturesImage(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_0.featureLevel()),
MetadataVersion.latestTesting()
);
assertFalse(image1.isElrEnabled());
FeaturesImage image2 = new FeaturesImage(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
MetadataVersion.latestTesting()
);
assertTrue(image2.isElrEnabled());
}
}

View File

@ -17,10 +17,26 @@
package org.apache.kafka.image.writer;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.AclsImageTest;
import org.apache.kafka.image.ClientQuotasImageTest;
import org.apache.kafka.image.ClusterImageTest;
import org.apache.kafka.image.ConfigurationsImageTest;
import org.apache.kafka.image.DelegationTokenImageTest;
import org.apache.kafka.image.FeaturesDelta;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.ProducerIdsImageTest;
import org.apache.kafka.image.ScramImageTest;
import org.apache.kafka.image.TopicsImageTest;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.function.Consumer;
@ -71,4 +87,53 @@ public class ImageWriterOptionsTest {
options.handleLoss(expectedMessage);
}
}
@Test
public void testSetEligibleLeaderReplicasEnabled() {
MetadataVersion version = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(version).
setEligibleLeaderReplicasEnabled(true).build();
assertEquals(true, options.isEligibleLeaderReplicasEnabled());
options = new ImageWriterOptions.Builder().
setMetadataVersion(version).build();
assertEquals(false, options.isEligibleLeaderReplicasEnabled());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConstructionWithImage(boolean isElrEnabled) {
FeaturesDelta featuresDelta = new FeaturesDelta(FeaturesImage.EMPTY);
featuresDelta.replay(new FeatureLevelRecord().
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
setFeatureLevel(isElrEnabled ?
EligibleLeaderReplicasVersion.ELRV_1.featureLevel() : EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
)
);
featuresDelta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel())
);
MetadataImage metadataImage = new MetadataImage(
new MetadataProvenance(100, 4, 2000, true),
featuresDelta.apply(),
ClusterImageTest.IMAGE1,
TopicsImageTest.IMAGE1,
ConfigurationsImageTest.IMAGE1,
ClientQuotasImageTest.IMAGE1,
ProducerIdsImageTest.IMAGE1,
AclsImageTest.IMAGE1,
ScramImageTest.IMAGE1,
DelegationTokenImageTest.IMAGE1
);
ImageWriterOptions options = new ImageWriterOptions.Builder(metadataImage).build();
assertEquals(MetadataVersion.IBP_4_0_IV1, options.metadataVersion());
if (isElrEnabled) {
assertEquals(true, options.isEligibleLeaderReplicasEnabled());
} else {
assertEquals(false, options.isEligibleLeaderReplicasEnabled());
}
}
}

View File

@ -338,6 +338,7 @@ public class PartitionRegistrationTest {
List<UnwritableMetadataException> exceptions = new ArrayList<>();
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
setLossHandler(exceptions::add).
build();
assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()),

View File

@ -29,6 +29,7 @@ import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.GroupVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -341,6 +342,9 @@ public class FormatterTest {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
(short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(GroupVersion.FEATURE_NAME).
setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));

View File

@ -65,10 +65,6 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion {
return dependencies;
}
public boolean isEligibleLeaderReplicasFeatureEnabeld() {
return featureLevel >= ELRV_1.featureLevel;
}
public static EligibleLeaderReplicasVersion fromFeatureLevel(short version) {
switch (version) {
case 0:

View File

@ -113,15 +113,17 @@ public enum MetadataVersion {
// Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848).
IBP_4_0_IV0(22, "4.0", "IV0", false),
// Add ELR related supports (KIP-966).
// PartitionRecord and PartitionChangeRecord are updated.
// ClearElrRecord is added.
IBP_4_0_IV1(23, "4.0", "IV1", true),
//
// NOTE: MetadataVersions after this point are unstable and may be changed.
// If users attempt to use an unstable MetadataVersion, they will get an error.
// Please move this comment when updating the LATEST_PRODUCTION constant.
//
// Add ELR related supports (KIP-966).
IBP_4_0_IV1(23, "4.0", "IV1", true),
// Bootstrap metadata version for transaction versions 1 and 2 (KIP-890)
IBP_4_0_IV2(24, "4.0", "IV2", false),
@ -152,7 +154,7 @@ public enum MetadataVersion {
* <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION,
* IT CANNOT BE CHANGED.</strong>
*/
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV0;
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV1;
// If you change the value above please also update
// LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py

View File

@ -114,7 +114,7 @@ DEV_VERSION = KafkaVersion("4.0.0-SNAPSHOT")
LATEST_STABLE_TRANSACTION_VERSION = 2
# This should match the LATEST_PRODUCTION version defined in MetadataVersion.java
LATEST_STABLE_METADATA_VERSION = "4.0-IV0"
LATEST_STABLE_METADATA_VERSION = "4.0-IV1"
# 0.11.0.x versions
V_0_11_0_3 = KafkaVersion("0.11.0.3")