diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index 8697ad00962..ebeab43b4f7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -494,12 +494,10 @@ public class PartitionChangeBuilder { private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { if (!useLastKnownLeaderInBalancedRecovery || !eligibleLeaderReplicasEnabled) return; - if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 || - partition.lastKnownElr[0] != partition.leader)) { + if (record.leader() == NO_LEADER && partition.lastKnownElr.length == 0) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownElr(List.of(partition.leader)); - } else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER)) - && partition.lastKnownElr.length > 0) { + } else if (record.leader() >= 0 && partition.lastKnownElr.length > 0) { // Clear the LastKnownElr field if the partition will have or continues to have a valid leader. record.setLastKnownElr(List.of()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index 312a207f8d7..e901079ec35 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -822,6 +822,48 @@ public class PartitionChangeBuilderTest { } } + @Test + public void testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader() { + PartitionRegistration partition = new PartitionRegistration.Builder() + .setReplicas(new int[] {1, 2, 3}) + .setDirectories(new Uuid[] { + DirectoryId.UNASSIGNED, + DirectoryId.UNASSIGNED, + DirectoryId.UNASSIGNED + }) + .setIsr(new int[] {1}) + .setElr(new int[] {2}) + .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(100) + .setPartitionEpoch(200) + .build(); + + short version = 2; // ELR supported + Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + + // No replica is acceptable as leader, so election yields NO_LEADER. + // We intentionally do not change target ISR so record.isr remains null. + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, + metadataVersionForPartitionChangeRecordVersion(version), 3) + .setElection(Election.PREFERRED) + .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) + .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) + .setUseLastKnownLeaderInBalancedRecovery(true); + + ApiMessageAndVersion change = builder.build().get(); + PartitionChangeRecord record = (PartitionChangeRecord) change.message(); + + assertEquals(NO_LEADER, record.leader()); + // There is no ISR update if we do not perform the leader verification on the ISR members. + assertNull(record.isr(), record.toString()); + assertEquals(1, record.lastKnownElr().size(), record.toString()); + assertEquals(1, record.lastKnownElr().get(0), record.toString()); + partition = partition.merge((PartitionChangeRecord) builder.build().get().message()); + assertArrayEquals(new int[] {1}, partition.lastKnownElr); + } + + @ParameterizedTest @MethodSource("partitionChangeRecordVersions") public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {