mirror of https://github.com/apache/kafka.git
MINOR: simplify last known elr update (#20629)
Simplify the last known elr update logic. This way can make a more robust logic. Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
f68a149a18
commit
162db130f6
|
@ -494,12 +494,10 @@ public class PartitionChangeBuilder {
|
|||
|
||||
private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
|
||||
if (!useLastKnownLeaderInBalancedRecovery || !eligibleLeaderReplicasEnabled) return;
|
||||
if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
|
||||
partition.lastKnownElr[0] != partition.leader)) {
|
||||
if (record.leader() == NO_LEADER && partition.lastKnownElr.length == 0) {
|
||||
// Only update the last known leader when the first time the partition becomes leaderless.
|
||||
record.setLastKnownElr(List.of(partition.leader));
|
||||
} else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER))
|
||||
&& partition.lastKnownElr.length > 0) {
|
||||
} else if (record.leader() >= 0 && partition.lastKnownElr.length > 0) {
|
||||
// Clear the LastKnownElr field if the partition will have or continues to have a valid leader.
|
||||
record.setLastKnownElr(List.of());
|
||||
}
|
||||
|
|
|
@ -822,6 +822,48 @@ public class PartitionChangeBuilderTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader() {
|
||||
PartitionRegistration partition = new PartitionRegistration.Builder()
|
||||
.setReplicas(new int[] {1, 2, 3})
|
||||
.setDirectories(new Uuid[] {
|
||||
DirectoryId.UNASSIGNED,
|
||||
DirectoryId.UNASSIGNED,
|
||||
DirectoryId.UNASSIGNED
|
||||
})
|
||||
.setIsr(new int[] {1})
|
||||
.setElr(new int[] {2})
|
||||
.setLeader(1)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(100)
|
||||
.setPartitionEpoch(200)
|
||||
.build();
|
||||
|
||||
short version = 2; // ELR supported
|
||||
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
|
||||
|
||||
// No replica is acceptable as leader, so election yields NO_LEADER.
|
||||
// We intentionally do not change target ISR so record.isr remains null.
|
||||
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
|
||||
metadataVersionForPartitionChangeRecordVersion(version), 3)
|
||||
.setElection(Election.PREFERRED)
|
||||
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
|
||||
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
|
||||
.setUseLastKnownLeaderInBalancedRecovery(true);
|
||||
|
||||
ApiMessageAndVersion change = builder.build().get();
|
||||
PartitionChangeRecord record = (PartitionChangeRecord) change.message();
|
||||
|
||||
assertEquals(NO_LEADER, record.leader());
|
||||
// There is no ISR update if we do not perform the leader verification on the ISR members.
|
||||
assertNull(record.isr(), record.toString());
|
||||
assertEquals(1, record.lastKnownElr().size(), record.toString());
|
||||
assertEquals(1, record.lastKnownElr().get(0), record.toString());
|
||||
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
|
||||
assertArrayEquals(new int[] {1}, partition.lastKnownElr);
|
||||
}
|
||||
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("partitionChangeRecordVersions")
|
||||
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
|
||||
|
|
Loading…
Reference in New Issue