From 3094ce2c202c96c57084e82892df44b1f124c481 Mon Sep 17 00:00:00 2001 From: Calvin Liu <83986057+CalvinConfluent@users.noreply.github.com> Date: Wed, 7 May 2025 13:26:53 -0700 Subject: [PATCH] KAFKA-19212: Correct the unclean leader election metric calculation (#19590) The current ElectionWasClean checks if the new leader is in the previous ISR. However, there is a corner case in the partition reassignment. The partition reassignment can change the partition replicas. If the new preferred leader (the first one in the new replicas) is the last one to join ISR, this preferred leader will be elected in the same partition change. For example: In the previous state, the partition is Leader: 0, Replicas (2,1,0), ISR (1,0), Adding(2), removing(0). Then replica 2 joins the ISR. The new partition would be like: Leader: 2, Replicas (2,1), ISR(1,2). The new leader 2 is not in the previous ISR (1,0) but it is still a clean election. Reviewers: Jun Rao --- .../controller/PartitionChangeBuilder.java | 8 ++-- .../metrics/ControllerMetricsChanges.java | 30 +++++++------- .../org/apache/kafka/image/TopicDelta.java | 40 ++++++++++++++++--- .../kafka/metadata/PartitionRegistration.java | 11 ++--- .../metrics/ControllerMetricsChangesTest.java | 17 ++++++++ .../apache/kafka/image/TopicsImageTest.java | 38 ++++++++++++++++++ .../metadata/PartitionRegistrationTest.java | 15 ++----- 7 files changed, 115 insertions(+), 44 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index 2fe49220018..ea196dfc741 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -334,11 +334,11 @@ public class PartitionChangeBuilder { targetIsr = List.of(electionResult.node); targetElr = targetElr.stream().filter(replica -> replica != electionResult.node) .collect(Collectors.toList()); - log.trace("Setting new leader for topicId {}, partition {} to {} using ELR", - topicId, partitionId, electionResult.node); + log.info("Setting new leader for topicId {}, partition {} to {} using ELR. Previous partition: {}, change record: {}", + topicId, partitionId, electionResult.node, partition, record); } else if (electionResult.unclean) { - log.info("Setting new leader for topicId {}, partition {} to {} using an unclean election", - topicId, partitionId, electionResult.node); + log.info("Setting new leader for topicId {}, partition {} to {} using an unclean election. Previous partition: {}, change record: {}", + topicId, partitionId, electionResult.node, partition, record); } else { log.trace("Setting new leader for topicId {}, partition {} to {} using a clean election", topicId, partitionId, electionResult.node); diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java index 843f779826e..53c77af57a8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java @@ -48,7 +48,7 @@ class ControllerMetricsChanges { private int offlinePartitionsChange = 0; private int partitionsWithoutPreferredLeaderChange = 0; private int uncleanLeaderElection = 0; - private int electionFromElrCounter = 0; + private int electionFromElr = 0; public int fencedBrokersChange() { return fencedBrokersChange; @@ -70,6 +70,14 @@ class ControllerMetricsChanges { return offlinePartitionsChange; } + public int uncleanLeaderElection() { + return uncleanLeaderElection; + } + + public int electionFromElr() { + return electionFromElr; + } + public int partitionsWithoutPreferredLeaderChange() { return partitionsWithoutPreferredLeaderChange; } @@ -105,10 +113,13 @@ class ControllerMetricsChanges { } else { for (Entry entry : topicDelta.partitionChanges().entrySet()) { int partitionId = entry.getKey(); + PartitionRegistration prevPartition = prev.partitions().get(partitionId); PartitionRegistration nextPartition = entry.getValue(); - handlePartitionChange(prev.partitions().get(partitionId), nextPartition); + handlePartitionChange(prevPartition, nextPartition); } } + topicDelta.partitionToUncleanLeaderElectionCount().forEach((partitionId, count) -> uncleanLeaderElection += count); + topicDelta.partitionToElrElectionCount().forEach((partitionId, count) -> electionFromElr += count); } void handlePartitionChange(PartitionRegistration prev, PartitionRegistration next) { @@ -127,15 +138,6 @@ class ControllerMetricsChanges { isPresent = true; isOffline = !next.hasLeader(); isWithoutPreferredLeader = !next.hasPreferredLeader(); - // take current all replicas as ISR if prev is null (new created partition), so we won't treat it as unclean election. - int[] prevIsr = prev != null ? prev.isr : next.replicas; - int[] prevElr = prev != null ? prev.elr : new int[]{}; - if (!PartitionRegistration.electionWasClean(next.leader, prevIsr, prevElr)) { - uncleanLeaderElection++; - } - if (PartitionRegistration.electionFromElr(next.leader, prevElr)) { - electionFromElrCounter++; - } } globalPartitionsChange += delta(wasPresent, isPresent); offlinePartitionsChange += delta(wasOffline, isOffline); @@ -168,9 +170,9 @@ class ControllerMetricsChanges { metrics.updateUncleanLeaderElection(uncleanLeaderElection); uncleanLeaderElection = 0; } - if (electionFromElrCounter > 0) { - metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElrCounter); - electionFromElrCounter = 0; + if (electionFromElr > 0) { + metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElr); + electionFromElr = 0; } } } diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java index fdb26e8e7df..8fc84c5e006 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java @@ -40,6 +40,8 @@ import java.util.stream.Collectors; public final class TopicDelta { private final TopicImage image; private final Map partitionChanges = new HashMap<>(); + private Map partitionToUncleanLeaderElectionCount = new HashMap<>(); + private Map partitionToElrElectionCount = new HashMap<>(); public TopicDelta(TopicImage image) { this.image = image; @@ -69,20 +71,46 @@ public final class TopicDelta { return image.id(); } + public Map partitionToElrElectionCount() { + return partitionToElrElectionCount; + } + public Map partitionToUncleanLeaderElectionCount() { + return partitionToUncleanLeaderElectionCount; + } + public void replay(PartitionRecord record) { + int partitionId = record.partitionId(); + PartitionRegistration prevPartition = partitionChanges.get(partitionId); + if (prevPartition == null) { + prevPartition = image.partitions().get(partitionId); + } + if (prevPartition != null) { + updateElectionStats(partitionId, prevPartition, record.leader(), record.leaderRecoveryState()); + } partitionChanges.put(record.partitionId(), new PartitionRegistration(record)); } public void replay(PartitionChangeRecord record) { - PartitionRegistration partition = partitionChanges.get(record.partitionId()); - if (partition == null) { - partition = image.partitions().get(record.partitionId()); - if (partition == null) { + int partitionId = record.partitionId(); + PartitionRegistration prevPartition = partitionChanges.get(partitionId); + if (prevPartition == null) { + prevPartition = image.partitions().get(partitionId); + if (prevPartition == null) { throw new RuntimeException("Unable to find partition " + - record.topicId() + ":" + record.partitionId()); + record.topicId() + ":" + partitionId); } } - partitionChanges.put(record.partitionId(), partition.merge(record)); + updateElectionStats(partitionId, prevPartition, record.leader(), record.leaderRecoveryState()); + partitionChanges.put(record.partitionId(), prevPartition.merge(record)); + } + + private void updateElectionStats(int partitionId, PartitionRegistration prevPartition, int newLeader, byte newLeaderRecoveryState) { + if (PartitionRegistration.electionWasUnclean(newLeaderRecoveryState)) { + partitionToUncleanLeaderElectionCount.put(partitionId, partitionToUncleanLeaderElectionCount.getOrDefault(partitionId, 0) + 1); + } + if (Replicas.contains(prevPartition.elr, newLeader)) { + partitionToElrElectionCount.put(partitionId, partitionToElrElectionCount.getOrDefault(partitionId, 0) + 1); + } } public void replay(ClearElrRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index a8f9e166e1f..f74ceded9c1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; -import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; @@ -165,12 +164,8 @@ public class PartitionRegistration { public final int leaderEpoch; public final int partitionEpoch; - public static boolean electionWasClean(int newLeader, int[] isr, int[] elr) { - return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) || Replicas.contains(elr, newLeader); - } - - public static boolean electionFromElr(int newLeader, int[] elr) { - return Replicas.contains(elr, newLeader); + public static boolean electionWasUnclean(byte leaderRecoveryState) { + return leaderRecoveryState == LeaderRecoveryState.RECOVERING.value(); } private static List checkDirectories(PartitionRecord record) { @@ -351,7 +346,7 @@ public class PartitionRegistration { } public void maybeLogPartitionChange(Logger log, String description, PartitionRegistration prev) { - if (!electionWasClean(leader, prev.isr, prev.elr)) { + if (electionWasUnclean(this.leaderRecoveryState.value())) { log.info("UNCLEAN partition change for {}: {}", description, diff(prev)); } else if (log.isDebugEnabled()) { log.debug("partition change for {}: {}", description, diff(prev)); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java index 1964c93c8b3..25bb5d85223 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java @@ -24,12 +24,14 @@ import org.apache.kafka.image.TopicDelta; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER; @@ -170,4 +172,19 @@ public class ControllerMetricsChangesTest { assertEquals(0, changes.offlinePartitionsChange()); assertEquals(1, changes.partitionsWithoutPreferredLeaderChange()); } + + @Test + public void testTopicElectionResult() { + ControllerMetricsChanges changes = new ControllerMetricsChanges(); + TopicImage image = new TopicImage("foo", FOO_ID, Map.of()); + TopicDelta delta = new TopicDelta(image); + delta.replay(new PartitionRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2))); + delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(2).setIsr(List.of(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())); + + delta.replay(new PartitionRecord().setPartitionId(1).setLeader(-1).setIsr(List.of()).setEligibleLeaderReplicas(List.of(0, 1)).setReplicas(List.of(0, 1, 2))); + delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(1).setIsr(List.of(1)).setEligibleLeaderReplicas(List.of(0, 1))); + changes.handleTopicChange(image, delta); + assertEquals(1, changes.uncleanLeaderElection()); + assertEquals(1, changes.electionFromElr()); + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index b91612d70ba..d50c5a4bcac 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -443,6 +444,43 @@ public class TopicsImageTest { assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords)); } + @Test + public void testTopicDeltaElectionStatsWithEmptyImage() { + TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); + TopicDelta delta = new TopicDelta(image); + delta.replay(new PartitionRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2))); + delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(2).setIsr(List.of(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())); + assertEquals(1, delta.partitionToUncleanLeaderElectionCount().get(0)); + delta.replay(new PartitionChangeRecord().setPartitionId(0).setIsr(List.of(1, 2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())); + assertEquals(1, delta.partitionToUncleanLeaderElectionCount().get(0)); + delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())); + assertEquals(2, delta.partitionToUncleanLeaderElectionCount().get(0)); + delta.replay(new PartitionChangeRecord().setPartitionId(0).setLeader(1).setIsr(List.of(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())); + assertEquals(3, delta.partitionToUncleanLeaderElectionCount().get(0)); + assertTrue(delta.partitionToElrElectionCount().isEmpty()); + + delta.replay(new PartitionRecord().setPartitionId(1).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2))); + delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(-1).setIsr(List.of()).setEligibleLeaderReplicas(List.of(0, 1))); + assertTrue(delta.partitionToElrElectionCount().isEmpty()); + delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(1).setIsr(List.of(1)).setEligibleLeaderReplicas(List.of(0, 1))); + assertEquals(1, delta.partitionToElrElectionCount().get(1)); + } + + @Test + public void testTopicDeltaElectionStatsWithNonEmptyImage() { + TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Map.of( + 0, new PartitionRegistration(new PartitionRecord().setPartitionId(0).setLeader(0).setIsr(List.of(0, 1)).setReplicas(List.of(0, 1, 2))), + 1, new PartitionRegistration(new PartitionRecord().setPartitionId(1).setLeader(-1).setIsr(List.of()).setEligibleLeaderReplicas(List.of(0, 1)).setReplicas(List.of(0, 1, 2))) + )); + TopicDelta delta = new TopicDelta(image); + delta.replay(new PartitionRecord().setPartitionId(0).setLeader(2).setIsr(List.of(2)).setReplicas(List.of(0, 1, 2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())); + assertEquals(1, delta.partitionToUncleanLeaderElectionCount().get(0)); + assertTrue(delta.partitionToElrElectionCount().isEmpty()); + + delta.replay(new PartitionChangeRecord().setPartitionId(1).setLeader(1).setIsr(List.of(1)).setEligibleLeaderReplicas(List.of(0, 1))); + assertEquals(1, delta.partitionToElrElectionCount().get(1)); + } + @Test public void testLocalReassignmentChanges() { int localId = 3; diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index c0de7d2125d..80898123587 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -55,18 +55,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(40) public class PartitionRegistrationTest { @Test - public void testElectionWasClean() { - assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2}, new int[]{})); - assertFalse(PartitionRegistration.electionWasClean(1, new int[]{0, 2}, new int[]{})); - assertFalse(PartitionRegistration.electionWasClean(1, new int[]{}, new int[]{3, 4})); - assertTrue(PartitionRegistration.electionWasClean(3, new int[]{1, 2, 3, 4, 5, 6}, new int[]{})); - assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new int[]{1, 2, 3})); - } - - @Test - public void testEligibleLeaderReplicasElection() { - assertTrue(PartitionRegistration.electionFromElr(1, new int[]{1, 2})); - assertFalse(PartitionRegistration.electionFromElr(1, new int[]{0, 2})); + public void testElectionWasUnclean() { + assertFalse(PartitionRegistration.electionWasUnclean(LeaderRecoveryState.RECOVERED.value())); + assertTrue(PartitionRegistration.electionWasUnclean(LeaderRecoveryState.RECOVERING.value())); } @Test