diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index 1c110e567a5..78c1c2363d7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -81,6 +81,8 @@ public class PartitionChangeBuilder { private List targetAdding; private Election election = Election.ONLINE; private LeaderRecoveryState targetLeaderRecoveryState; + private boolean bumpLeaderEpochOnIsrShrink; + public PartitionChangeBuilder( PartitionRegistration partition, @@ -94,6 +96,8 @@ public class PartitionChangeBuilder { this.partitionId = partitionId; this.isAcceptableLeader = isAcceptableLeader; this.metadataVersion = metadataVersion; + this.bumpLeaderEpochOnIsrShrink = !metadataVersion.isSkipLeaderEpochBumpSupported(); + this.targetIsr = Replicas.toList(partition.isr); this.targetReplicas = Replicas.toList(partition.replicas); this.targetRemoving = Replicas.toList(partition.removingReplicas); @@ -140,6 +144,11 @@ public class PartitionChangeBuilder { return this; } + public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean bumpLeaderEpochOnIsrShrink) { + this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink; + return this; + } + // VisibleForTesting static class ElectionResult { final int node; @@ -273,13 +282,15 @@ public class PartitionChangeBuilder { * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader * bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if * the ISR expanded. + * + * In MV 3.6 and beyond, if the controller is in ZK migration mode, the leader epoch must + * be bumped during ISR shrink for compatability with ZK brokers. */ void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) { if (record.leader() == NO_LEADER_CHANGE) { if (!Replicas.contains(targetReplicas, partition.replicas)) { record.setLeader(partition.leader); - } else if (!metadataVersion.isSkipLeaderEpochBumpSupported() && - !Replicas.contains(targetIsr, partition.isr)) { + } else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) { record.setLeader(partition.leader); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index f7d5e69ed8b..7af6ab8b317 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -995,6 +995,7 @@ public class ReplicationControlManager { clusterControl::isActive, featureControl.metadataVersion() ); + builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } @@ -1382,7 +1383,7 @@ public class ReplicationControlManager { clusterControl::isActive, featureControl.metadataVersion() ); - builder.setElection(election); + builder.setElection(election).setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed()); Optional record = builder.build(); if (!record.isPresent()) { if (electionType == ElectionType.PREFERRED) { @@ -1517,7 +1518,8 @@ public class ReplicationControlManager { clusterControl::isActive, featureControl.metadataVersion() ); - builder.setElection(PartitionChangeBuilder.Election.PREFERRED); + builder.setElection(PartitionChangeBuilder.Election.PREFERRED) + .setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed()); builder.build().ifPresent(records::add); } @@ -1738,6 +1740,7 @@ public class ReplicationControlManager { isAcceptableLeader, featureControl.metadataVersion() ); + builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } @@ -1850,6 +1853,7 @@ public class ReplicationControlManager { clusterControl::isActive, featureControl.metadataVersion() ); + builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } @@ -1907,6 +1911,7 @@ public class ReplicationControlManager { clusterControl::isActive, featureControl.metadataVersion() ); + builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed()); if (!reassignment.replicas().equals(currentReplicas)) { builder.setTargetReplicas(reassignment.replicas()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index 3374f12d7ac..c707642bebd 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -31,12 +31,14 @@ import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; import java.util.Optional; import java.util.function.IntPredicate; +import java.util.stream.Stream; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; import static org.apache.kafka.controller.PartitionChangeBuilder.Election; @@ -46,6 +48,7 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; @Timeout(value = 40) @@ -200,6 +203,14 @@ public class PartitionChangeBuilderTest { new PartitionChangeRecord(), NO_LEADER_CHANGE ); + testTriggerLeaderEpochBumpIfNeededLeader( + createFooBuilder() + .setTargetIsrWithBrokerStates( + AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))) + .setBumpLeaderEpochOnIsrShrink(true), + new PartitionChangeRecord(), + NO_LEADER_CHANGE + ); testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(). setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), NO_LEADER_CHANGE); @@ -217,6 +228,16 @@ public class PartitionChangeBuilderTest { new PartitionChangeRecord(), 1 ); + + // KAFKA-15109: Shrinking the ISR while in ZK migration mode does increase the leader epoch + testTriggerLeaderEpochBumpIfNeededLeader( + createFooBuilder() + .setTargetIsrWithBrokerStates( + AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))) + .setBumpLeaderEpochOnIsrShrink(true), + new PartitionChangeRecord(), + 1 + ); } @Test @@ -382,9 +403,18 @@ public class PartitionChangeBuilderTest { ); } + private static Stream leaderRecoveryAndZkMigrationParams() { + return Stream.of( + arguments(true, true), + arguments(true, false), + arguments(false, true), + arguments(false, false) + ); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean isLeaderRecoverySupported) { + @MethodSource("leaderRecoveryAndZkMigrationParams") + public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean isLeaderRecoverySupported, boolean zkMigrationsEnabled) { final byte noChange = (byte) -1; int leaderId = 1; LeaderRecoveryState recoveryState = LeaderRecoveryState.RECOVERING; @@ -407,6 +437,7 @@ public class PartitionChangeBuilderTest { brokerId -> false, metadataVersion ); + offlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled); // Set the target ISR to empty to indicate that the last leader is offline offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList()); @@ -432,6 +463,7 @@ public class PartitionChangeBuilderTest { brokerId -> true, metadataVersion ); + onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled); // The only broker in the ISR is elected leader and stays in the recovering changeRecord = (PartitionChangeRecord) onlineBuilder.build().get().message(); @@ -445,8 +477,8 @@ public class PartitionChangeBuilderTest { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testUncleanSetsLeaderRecoveringState(boolean isLeaderRecoverySupported) { + @MethodSource("leaderRecoveryAndZkMigrationParams") + void testUncleanSetsLeaderRecoveringState(boolean isLeaderRecoverySupported, boolean zkMigrationsEnabled) { final byte noChange = (byte) -1; int leaderId = 1; PartitionRegistration registration = new PartitionRegistration.Builder(). @@ -468,7 +500,7 @@ public class PartitionChangeBuilderTest { brokerId -> brokerId == leaderId, metadataVersion ).setElection(Election.UNCLEAN); - + onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled); // The partition should stay as recovering PartitionChangeRecord changeRecord = (PartitionChangeRecord) onlineBuilder .build()