mirror of https://github.com/apache/kafka.git
KAFKA-15109 Don't skip leader epoch bump while in migration mode (#13890)
While in migration mode, the KRaft controller must always bump the leader epoch when shrinking an ISR. This is required to maintain compatibility with the ZK brokers. Without the epoch bump, the ZK brokers will ignore the partition state change present in the LeaderAndIsrRequest since it would not contain a new leader epoch. Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
88e784f7c6
commit
d0457f7360
|
@ -81,6 +81,8 @@ public class PartitionChangeBuilder {
|
|||
private List<Integer> targetAdding;
|
||||
private Election election = Election.ONLINE;
|
||||
private LeaderRecoveryState targetLeaderRecoveryState;
|
||||
private boolean bumpLeaderEpochOnIsrShrink;
|
||||
|
||||
|
||||
public PartitionChangeBuilder(
|
||||
PartitionRegistration partition,
|
||||
|
@ -94,6 +96,8 @@ public class PartitionChangeBuilder {
|
|||
this.partitionId = partitionId;
|
||||
this.isAcceptableLeader = isAcceptableLeader;
|
||||
this.metadataVersion = metadataVersion;
|
||||
this.bumpLeaderEpochOnIsrShrink = !metadataVersion.isSkipLeaderEpochBumpSupported();
|
||||
|
||||
this.targetIsr = Replicas.toList(partition.isr);
|
||||
this.targetReplicas = Replicas.toList(partition.replicas);
|
||||
this.targetRemoving = Replicas.toList(partition.removingReplicas);
|
||||
|
@ -140,6 +144,11 @@ public class PartitionChangeBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean bumpLeaderEpochOnIsrShrink) {
|
||||
this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
|
||||
return this;
|
||||
}
|
||||
|
||||
// VisibleForTesting
|
||||
static class ElectionResult {
|
||||
final int node;
|
||||
|
@ -273,13 +282,15 @@ public class PartitionChangeBuilder {
|
|||
* that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
|
||||
* bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if
|
||||
* the ISR expanded.
|
||||
*
|
||||
* In MV 3.6 and beyond, if the controller is in ZK migration mode, the leader epoch must
|
||||
* be bumped during ISR shrink for compatability with ZK brokers.
|
||||
*/
|
||||
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
|
||||
if (record.leader() == NO_LEADER_CHANGE) {
|
||||
if (!Replicas.contains(targetReplicas, partition.replicas)) {
|
||||
record.setLeader(partition.leader);
|
||||
} else if (!metadataVersion.isSkipLeaderEpochBumpSupported() &&
|
||||
!Replicas.contains(targetIsr, partition.isr)) {
|
||||
} else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) {
|
||||
record.setLeader(partition.leader);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -995,6 +995,7 @@ public class ReplicationControlManager {
|
|||
clusterControl::isActive,
|
||||
featureControl.metadataVersion()
|
||||
);
|
||||
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
|
||||
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
|
||||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
|
@ -1382,7 +1383,7 @@ public class ReplicationControlManager {
|
|||
clusterControl::isActive,
|
||||
featureControl.metadataVersion()
|
||||
);
|
||||
builder.setElection(election);
|
||||
builder.setElection(election).setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
|
||||
Optional<ApiMessageAndVersion> record = builder.build();
|
||||
if (!record.isPresent()) {
|
||||
if (electionType == ElectionType.PREFERRED) {
|
||||
|
@ -1517,7 +1518,8 @@ public class ReplicationControlManager {
|
|||
clusterControl::isActive,
|
||||
featureControl.metadataVersion()
|
||||
);
|
||||
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
|
||||
builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
|
||||
.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
|
||||
builder.build().ifPresent(records::add);
|
||||
}
|
||||
|
||||
|
@ -1738,6 +1740,7 @@ public class ReplicationControlManager {
|
|||
isAcceptableLeader,
|
||||
featureControl.metadataVersion()
|
||||
);
|
||||
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
|
||||
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
|
||||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
|
@ -1850,6 +1853,7 @@ public class ReplicationControlManager {
|
|||
clusterControl::isActive,
|
||||
featureControl.metadataVersion()
|
||||
);
|
||||
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
|
||||
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
|
||||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
|
@ -1907,6 +1911,7 @@ public class ReplicationControlManager {
|
|||
clusterControl::isActive,
|
||||
featureControl.metadataVersion()
|
||||
);
|
||||
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
|
||||
if (!reassignment.replicas().equals(currentReplicas)) {
|
||||
builder.setTargetReplicas(reassignment.replicas());
|
||||
}
|
||||
|
|
|
@ -31,12 +31,14 @@ import org.apache.kafka.server.common.MetadataVersion;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.function.IntPredicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
|
||||
import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
|
||||
|
@ -46,6 +48,7 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.params.provider.Arguments.arguments;
|
||||
|
||||
|
||||
@Timeout(value = 40)
|
||||
|
@ -200,6 +203,14 @@ public class PartitionChangeBuilderTest {
|
|||
new PartitionChangeRecord(),
|
||||
NO_LEADER_CHANGE
|
||||
);
|
||||
testTriggerLeaderEpochBumpIfNeededLeader(
|
||||
createFooBuilder()
|
||||
.setTargetIsrWithBrokerStates(
|
||||
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4)))
|
||||
.setBumpLeaderEpochOnIsrShrink(true),
|
||||
new PartitionChangeRecord(),
|
||||
NO_LEADER_CHANGE
|
||||
);
|
||||
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
|
||||
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
|
||||
NO_LEADER_CHANGE);
|
||||
|
@ -217,6 +228,16 @@ public class PartitionChangeBuilderTest {
|
|||
new PartitionChangeRecord(),
|
||||
1
|
||||
);
|
||||
|
||||
// KAFKA-15109: Shrinking the ISR while in ZK migration mode does increase the leader epoch
|
||||
testTriggerLeaderEpochBumpIfNeededLeader(
|
||||
createFooBuilder()
|
||||
.setTargetIsrWithBrokerStates(
|
||||
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
|
||||
.setBumpLeaderEpochOnIsrShrink(true),
|
||||
new PartitionChangeRecord(),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -382,9 +403,18 @@ public class PartitionChangeBuilderTest {
|
|||
);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> leaderRecoveryAndZkMigrationParams() {
|
||||
return Stream.of(
|
||||
arguments(true, true),
|
||||
arguments(true, false),
|
||||
arguments(false, true),
|
||||
arguments(false, false)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean isLeaderRecoverySupported) {
|
||||
@MethodSource("leaderRecoveryAndZkMigrationParams")
|
||||
public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
|
||||
final byte noChange = (byte) -1;
|
||||
int leaderId = 1;
|
||||
LeaderRecoveryState recoveryState = LeaderRecoveryState.RECOVERING;
|
||||
|
@ -407,6 +437,7 @@ public class PartitionChangeBuilderTest {
|
|||
brokerId -> false,
|
||||
metadataVersion
|
||||
);
|
||||
offlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
|
||||
// Set the target ISR to empty to indicate that the last leader is offline
|
||||
offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
|
||||
|
||||
|
@ -432,6 +463,7 @@ public class PartitionChangeBuilderTest {
|
|||
brokerId -> true,
|
||||
metadataVersion
|
||||
);
|
||||
onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
|
||||
|
||||
// The only broker in the ISR is elected leader and stays in the recovering
|
||||
changeRecord = (PartitionChangeRecord) onlineBuilder.build().get().message();
|
||||
|
@ -445,8 +477,8 @@ public class PartitionChangeBuilderTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
void testUncleanSetsLeaderRecoveringState(boolean isLeaderRecoverySupported) {
|
||||
@MethodSource("leaderRecoveryAndZkMigrationParams")
|
||||
void testUncleanSetsLeaderRecoveringState(boolean isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
|
||||
final byte noChange = (byte) -1;
|
||||
int leaderId = 1;
|
||||
PartitionRegistration registration = new PartitionRegistration.Builder().
|
||||
|
@ -468,7 +500,7 @@ public class PartitionChangeBuilderTest {
|
|||
brokerId -> brokerId == leaderId,
|
||||
metadataVersion
|
||||
).setElection(Election.UNCLEAN);
|
||||
|
||||
onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
|
||||
// The partition should stay as recovering
|
||||
PartitionChangeRecord changeRecord = (PartitionChangeRecord) onlineBuilder
|
||||
.build()
|
||||
|
|
Loading…
Reference in New Issue