diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java index dc5b5cf5b93..7a4fef9182e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java @@ -128,7 +128,8 @@ class ControllerMetricsChanges { isWithoutPreferredLeader = !next.hasPreferredLeader(); // take current all replicas as ISR if prev is null (new created partition), so we won't treat it as unclean election. int[] prevIsr = prev != null ? prev.isr : next.replicas; - if (!PartitionRegistration.electionWasClean(next.leader, prevIsr)) { + int[] prevElr = prev != null ? prev.elr : new int[]{}; + if (!PartitionRegistration.electionWasClean(next.leader, prevIsr, prevElr)) { uncleanLeaderElection++; } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index 458f069439b..29ed37bb81d 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -165,8 +165,8 @@ public class PartitionRegistration { public final int leaderEpoch; public final int partitionEpoch; - public static boolean electionWasClean(int newLeader, int[] isr) { - return newLeader == NO_LEADER || Replicas.contains(isr, newLeader); + public static boolean electionWasClean(int newLeader, int[] isr, int[] elr) { + return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) || Replicas.contains(elr, newLeader); } private static List checkDirectories(PartitionRecord record) { @@ -347,7 +347,7 @@ public class PartitionRegistration { } public void maybeLogPartitionChange(Logger log, String description, PartitionRegistration prev) { - if (!electionWasClean(leader, prev.isr)) { + if (!electionWasClean(leader, prev.isr, prev.elr)) { log.info("UNCLEAN partition change for {}: {}", description, diff(prev)); } else if (log.isDebugEnabled()) { log.debug("partition change for {}: {}", description, diff(prev)); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 8dc817f2621..7d15ea32ddb 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -58,10 +58,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class PartitionRegistrationTest { @Test public void testElectionWasClean() { - assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2})); - assertFalse(PartitionRegistration.electionWasClean(1, new int[]{0, 2})); - assertFalse(PartitionRegistration.electionWasClean(1, new int[]{})); - assertTrue(PartitionRegistration.electionWasClean(3, new int[]{1, 2, 3, 4, 5, 6})); + assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2}, new int[]{})); + assertFalse(PartitionRegistration.electionWasClean(1, new int[]{0, 2}, new int[]{})); + assertFalse(PartitionRegistration.electionWasClean(1, new int[]{}, new int[]{3, 4})); + assertTrue(PartitionRegistration.electionWasClean(3, new int[]{1, 2, 3, 4, 5, 6}, new int[]{})); + assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new int[]{1, 2, 3})); } @Test