KAFKA-19212: Correct the unclean leader election metric calculation (#19590)
CI / build (push) Waiting to run Details

The current ElectionWasClean checks if the new leader is in the previous
ISR. However, there is a corner case in the partition reassignment.  The
partition reassignment can change the partition replicas. If the new
preferred leader (the first one in the new replicas) is the last one to
join ISR, this preferred leader will be elected in the same partition
change.

For example:  In the previous state, the partition is  Leader: 0,
Replicas (2,1,0), ISR (1,0), Adding(2), removing(0).  Then replica 2
joins the ISR. The new partition would be like:  Leader: 2, Replicas
(2,1), ISR(1,2).  The new leader 2 is not in the previous ISR (1,0) but
it is still a clean election.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Calvin Liu 2025-05-07 13:26:53 -07:00 committed by GitHub
parent 67b46fec15
commit 3094ce2c20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 115 additions and 44 deletions

View File

@ -334,11 +334,11 @@ public class PartitionChangeBuilder {
targetIsr = List.of(electionResult.node); targetIsr = List.of(electionResult.node);
targetElr = targetElr.stream().filter(replica -> replica != electionResult.node) targetElr = targetElr.stream().filter(replica -> replica != electionResult.node)
.collect(Collectors.toList()); .collect(Collectors.toList());
log.trace("Setting new leader for topicId {}, partition {} to {} using ELR", log.info("Setting new leader for topicId {}, partition {} to {} using ELR. Previous partition: {}, change record: {}",
topicId, partitionId, electionResult.node); topicId, partitionId, electionResult.node, partition, record);
} else if (electionResult.unclean) { } else if (electionResult.unclean) {
log.info("Setting new leader for topicId {}, partition {} to {} using an unclean election", log.info("Setting new leader for topicId {}, partition {} to {} using an unclean election. Previous partition: {}, change record: {}",
topicId, partitionId, electionResult.node); topicId, partitionId, electionResult.node, partition, record);
} else { } else {
log.trace("Setting new leader for topicId {}, partition {} to {} using a clean election", log.trace("Setting new leader for topicId {}, partition {} to {} using a clean election",
topicId, partitionId, electionResult.node); topicId, partitionId, electionResult.node);

View File

@ -48,7 +48,7 @@ class ControllerMetricsChanges {
private int offlinePartitionsChange = 0; private int offlinePartitionsChange = 0;
private int partitionsWithoutPreferredLeaderChange = 0; private int partitionsWithoutPreferredLeaderChange = 0;
private int uncleanLeaderElection = 0; private int uncleanLeaderElection = 0;
private int electionFromElrCounter = 0; private int electionFromElr = 0;
public int fencedBrokersChange() { public int fencedBrokersChange() {
return fencedBrokersChange; return fencedBrokersChange;
@ -70,6 +70,14 @@ class ControllerMetricsChanges {
return offlinePartitionsChange; return offlinePartitionsChange;
} }
public int uncleanLeaderElection() {
return uncleanLeaderElection;
}
public int electionFromElr() {
return electionFromElr;
}
public int partitionsWithoutPreferredLeaderChange() { public int partitionsWithoutPreferredLeaderChange() {
return partitionsWithoutPreferredLeaderChange; return partitionsWithoutPreferredLeaderChange;
} }
@ -105,10 +113,13 @@ class ControllerMetricsChanges {
} else { } else {
for (Entry<Integer, PartitionRegistration> entry : topicDelta.partitionChanges().entrySet()) { for (Entry<Integer, PartitionRegistration> entry : topicDelta.partitionChanges().entrySet()) {
int partitionId = entry.getKey(); int partitionId = entry.getKey();
PartitionRegistration prevPartition = prev.partitions().get(partitionId);
PartitionRegistration nextPartition = entry.getValue(); PartitionRegistration nextPartition = entry.getValue();
handlePartitionChange(prev.partitions().get(partitionId), nextPartition); handlePartitionChange(prevPartition, nextPartition);
} }
} }
topicDelta.partitionToUncleanLeaderElectionCount().forEach((partitionId, count) -> uncleanLeaderElection += count);
topicDelta.partitionToElrElectionCount().forEach((partitionId, count) -> electionFromElr += count);
} }
void handlePartitionChange(PartitionRegistration prev, PartitionRegistration next) { void handlePartitionChange(PartitionRegistration prev, PartitionRegistration next) {
@ -127,15 +138,6 @@ class ControllerMetricsChanges {
isPresent = true; isPresent = true;
isOffline = !next.hasLeader(); isOffline = !next.hasLeader();
isWithoutPreferredLeader = !next.hasPreferredLeader(); isWithoutPreferredLeader = !next.hasPreferredLeader();
// take current all replicas as ISR if prev is null (new created partition), so we won't treat it as unclean election.
int[] prevIsr = prev != null ? prev.isr : next.replicas;
int[] prevElr = prev != null ? prev.elr : new int[]{};
if (!PartitionRegistration.electionWasClean(next.leader, prevIsr, prevElr)) {
uncleanLeaderElection++;
}
if (PartitionRegistration.electionFromElr(next.leader, prevElr)) {
electionFromElrCounter++;
}
} }
globalPartitionsChange += delta(wasPresent, isPresent); globalPartitionsChange += delta(wasPresent, isPresent);
offlinePartitionsChange += delta(wasOffline, isOffline); offlinePartitionsChange += delta(wasOffline, isOffline);
@ -168,9 +170,9 @@ class ControllerMetricsChanges {
metrics.updateUncleanLeaderElection(uncleanLeaderElection); metrics.updateUncleanLeaderElection(uncleanLeaderElection);
uncleanLeaderElection = 0; uncleanLeaderElection = 0;
} }
if (electionFromElrCounter > 0) { if (electionFromElr > 0) {
metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElrCounter); metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElr);
electionFromElrCounter = 0; electionFromElr = 0;
} }
} }
} }

View File

@ -40,6 +40,8 @@ import java.util.stream.Collectors;
public final class TopicDelta { public final class TopicDelta {
private final TopicImage image; private final TopicImage image;
private final Map<Integer, PartitionRegistration> partitionChanges = new HashMap<>(); private final Map<Integer, PartitionRegistration> partitionChanges = new HashMap<>();
private Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new HashMap<>();
private Map<Integer, Integer> partitionToElrElectionCount = new HashMap<>();
public TopicDelta(TopicImage image) { public TopicDelta(TopicImage image) {
this.image = image; this.image = image;
@ -69,20 +71,46 @@ public final class TopicDelta {
return image.id(); return image.id();
} }
public Map<Integer, Integer> partitionToElrElectionCount() {
return partitionToElrElectionCount;
}
public Map<Integer, Integer> partitionToUncleanLeaderElectionCount() {
return partitionToUncleanLeaderElectionCount;
}
public void replay(PartitionRecord record) { public void replay(PartitionRecord record) {
int partitionId = record.partitionId();
PartitionRegistration prevPartition = partitionChanges.get(partitionId);
if (prevPartition == null) {
prevPartition = image.partitions().get(partitionId);
}
if (prevPartition != null) {
updateElectionStats(partitionId, prevPartition, record.leader(), record.leaderRecoveryState());
}
partitionChanges.put(record.partitionId(), new PartitionRegistration(record)); partitionChanges.put(record.partitionId(), new PartitionRegistration(record));
} }
public void replay(PartitionChangeRecord record) { public void replay(PartitionChangeRecord record) {
PartitionRegistration partition = partitionChanges.get(record.partitionId()); int partitionId = record.partitionId();
if (partition == null) { PartitionRegistration prevPartition = partitionChanges.get(partitionId);
partition = image.partitions().get(record.partitionId()); if (prevPartition == null) {
if (partition == null) { prevPartition = image.partitions().get(partitionId);
if (prevPartition == null) {
throw new RuntimeException("Unable to find partition " + throw new RuntimeException("Unable to find partition " +
record.topicId() + ":" + record.partitionId()); record.topicId() + ":" + partitionId);
} }
} }
partitionChanges.put(record.partitionId(), partition.merge(record)); updateElectionStats(partitionId, prevPartition, record.leader(), record.leaderRecoveryState());
partitionChanges.put(record.partitionId(), prevPartition.merge(record));
}
private void updateElectionStats(int partitionId, PartitionRegistration prevPartition, int newLeader, byte newLeaderRecoveryState) {
if (PartitionRegistration.electionWasUnclean(newLeaderRecoveryState)) {
partitionToUncleanLeaderElectionCount.put(partitionId, partitionToUncleanLeaderElectionCount.getOrDefault(partitionId, 0) + 1);
}
if (Replicas.contains(prevPartition.elr, newLeader)) {
partitionToElrElectionCount.put(partitionId, partitionToElrElectionCount.getOrDefault(partitionId, 0) + 1);
}
} }
public void replay(ClearElrRecord record) { public void replay(ClearElrRecord record) {

View File

@ -33,7 +33,6 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
@ -165,12 +164,8 @@ public class PartitionRegistration {
public final int leaderEpoch; public final int leaderEpoch;
public final int partitionEpoch; public final int partitionEpoch;
public static boolean electionWasClean(int newLeader, int[] isr, int[] elr) { public static boolean electionWasUnclean(byte leaderRecoveryState) {
return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) || Replicas.contains(elr, newLeader); return leaderRecoveryState == LeaderRecoveryState.RECOVERING.value();
}
public static boolean electionFromElr(int newLeader, int[] elr) {
return Replicas.contains(elr, newLeader);
} }
private static List<Uuid> checkDirectories(PartitionRecord record) { private static List<Uuid> checkDirectories(PartitionRecord record) {
@ -351,7 +346,7 @@ public class PartitionRegistration {
} }
public void maybeLogPartitionChange(Logger log, String description, PartitionRegistration prev) { public void maybeLogPartitionChange(Logger log, String description, PartitionRegistration prev) {
if (!electionWasClean(leader, prev.isr, prev.elr)) { if (electionWasUnclean(this.leaderRecoveryState.value())) {
log.info("UNCLEAN partition change for {}: {}", description, diff(prev)); log.info("UNCLEAN partition change for {}: {}", description, diff(prev));
} else if (log.isDebugEnabled()) { } else if (log.isDebugEnabled()) {
log.debug("partition change for {}: {}", description, diff(prev)); log.debug("partition change for {}: {}", description, diff(prev));

View File

@ -24,12 +24,14 @@ import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER; import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
@ -170,4 +172,19 @@ public class ControllerMetricsChangesTest {
assertEquals(0, changes.offlinePartitionsChange()); assertEquals(0, changes.offlinePartitionsChange());
assertEquals(1, changes.partitionsWithoutPreferredLeaderChange()); assertEquals(1, changes.partitionsWithoutPreferredLeaderChange());
} }
@Test
public void testTopicElectionResult() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
TopicImage image = new TopicImage("foo", FOO_ID, Map.of());
TopicDelta delta = new TopicDelta(image);
delta.replay(new PartitionRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2)));
delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(2).setIsr(List.of(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
delta.replay(new PartitionRecord().setPartitionId(1).setLeader(-1).setIsr(List.of()).setEligibleLeaderReplicas(List.of(0, 1)).setReplicas(List.of(0, 1, 2)));
delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(1).setIsr(List.of(1)).setEligibleLeaderReplicas(List.of(0, 1)));
changes.handleTopicChange(image, delta);
assertEquals(1, changes.uncleanLeaderElection());
assertEquals(1, changes.electionFromElr());
}
} }

View File

@ -39,6 +39,7 @@ import org.junit.jupiter.api.Timeout;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -443,6 +444,43 @@ public class TopicsImageTest {
assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords)); assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords));
} }
@Test
public void testTopicDeltaElectionStatsWithEmptyImage() {
TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP);
TopicDelta delta = new TopicDelta(image);
delta.replay(new PartitionRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2)));
delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(2).setIsr(List.of(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
assertEquals(1, delta.partitionToUncleanLeaderElectionCount().get(0));
delta.replay(new PartitionChangeRecord().setPartitionId(0).setIsr(List.of(1, 2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()));
assertEquals(1, delta.partitionToUncleanLeaderElectionCount().get(0));
delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
assertEquals(2, delta.partitionToUncleanLeaderElectionCount().get(0));
delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(1).setIsr(List.of(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
assertEquals(3, delta.partitionToUncleanLeaderElectionCount().get(0));
assertTrue(delta.partitionToElrElectionCount().isEmpty());
delta.replay(new PartitionRecord().setPartitionId(1).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2)));
delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(-1).setIsr(List.of()).setEligibleLeaderReplicas(List.of(0, 1)));
assertTrue(delta.partitionToElrElectionCount().isEmpty());
delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(1).setIsr(List.of(1)).setEligibleLeaderReplicas(List.of(0, 1)));
assertEquals(1, delta.partitionToElrElectionCount().get(1));
}
@Test
public void testTopicDeltaElectionStatsWithNonEmptyImage() {
TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Map.of(
0, new PartitionRegistration(new PartitionRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2))),
1, new PartitionRegistration(new PartitionRecord().setPartitionId(1).setLeader(-1).setIsr(List.of()).setEligibleLeaderReplicas(List.of(0, 1)).setReplicas(List.of(0, 1, 2)))
));
TopicDelta delta = new TopicDelta(image);
delta.replay(new PartitionRecord().setPartitionId(0).setLeader(2).setIsr(List.of(2)).setReplicas(List.of(0, 1, 2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
assertEquals(1, delta.partitionToUncleanLeaderElectionCount().get(0));
assertTrue(delta.partitionToElrElectionCount().isEmpty());
delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(1).setIsr(List.of(1)).setEligibleLeaderReplicas(List.of(0, 1)));
assertEquals(1, delta.partitionToElrElectionCount().get(1));
}
@Test @Test
public void testLocalReassignmentChanges() { public void testLocalReassignmentChanges() {
int localId = 3; int localId = 3;

View File

@ -55,18 +55,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40) @Timeout(40)
public class PartitionRegistrationTest { public class PartitionRegistrationTest {
@Test @Test
public void testElectionWasClean() { public void testElectionWasUnclean() {
assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2}, new int[]{})); assertFalse(PartitionRegistration.electionWasUnclean(LeaderRecoveryState.RECOVERED.value()));
assertFalse(PartitionRegistration.electionWasClean(1, new int[]{0, 2}, new int[]{})); assertTrue(PartitionRegistration.electionWasUnclean(LeaderRecoveryState.RECOVERING.value()));
assertFalse(PartitionRegistration.electionWasClean(1, new int[]{}, new int[]{3, 4}));
assertTrue(PartitionRegistration.electionWasClean(3, new int[]{1, 2, 3, 4, 5, 6}, new int[]{}));
assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new int[]{1, 2, 3}));
}
@Test
public void testEligibleLeaderReplicasElection() {
assertTrue(PartitionRegistration.electionFromElr(1, new int[]{1, 2}));
assertFalse(PartitionRegistration.electionFromElr(1, new int[]{0, 2}));
} }
@Test @Test