KAFKA-18954: Add ELR election rate metric (#19180)

Add a metric to track the number of election is done using ELR.
https://issues.apache.org/jira/browse/KAFKA-18954

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Justine Olshan
<jolshan@confluent.io>
This commit is contained in:
Calvin Liu 2025-03-20 15:37:49 -07:00 committed by GitHub
parent 71875ec58e
commit 1c582a4a35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 51 additions and 1 deletions

View File

@ -1528,6 +1528,11 @@ NodeId DirectoryId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTi
<td>kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec</td> <td>kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec</td>
<td>0</td> <td>0</td>
</tr> </tr>
<tr>
<td>Election from Eligible leader replicas rate</td>
<td>kafka.controller:type=ControllerStats,name=ElectionFromEligibleLeaderReplicasPerSec</td>
<td>0</td>
</tr>
<tr> <tr>
<td>Is controller active on broker</td> <td>Is controller active on broker</td>
<td>kafka.controller:type=KafkaController,name=ActiveControllerCount</td> <td>kafka.controller:type=KafkaController,name=ActiveControllerCount</td>

View File

@ -55,6 +55,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "MetadataErrorCount"); "KafkaController", "MetadataErrorCount");
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName( private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec"); "ControllerStats", "UncleanLeaderElectionsPerSec");
private static final MetricName ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC = getMetricName(
"ControllerStats", "ElectionFromEligibleLeaderReplicasPerSec");
private static final MetricName IGNORED_STATIC_VOTERS = getMetricName( private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters"); "KafkaController", "IgnoredStaticVoters");
@ -67,6 +69,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0); private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0); private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty(); private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
private Optional<Meter> electionFromEligibleLeaderReplicasMeter = Optional.empty();
private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false); private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
/** /**
@ -120,6 +123,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
})); }));
registry.ifPresent(r -> uncleanLeaderElectionMeter = registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS))); Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));
registry.ifPresent(r -> electionFromEligibleLeaderReplicasMeter =
Optional.of(registry.get().newMeter(ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC, "elections", TimeUnit.SECONDS)));
registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Integer>() { registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Integer>() {
@Override @Override
@ -213,6 +218,10 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count)); this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
} }
public void updateElectionFromEligibleLeaderReplicasCount(int count) {
this.electionFromEligibleLeaderReplicasMeter.ifPresent(m -> m.mark(count));
}
public void setIgnoredStaticVoters(boolean ignored) { public void setIgnoredStaticVoters(boolean ignored) {
ignoredStaticVoters.set(ignored); ignoredStaticVoters.set(ignored);
} }
@ -232,6 +241,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
PREFERRED_REPLICA_IMBALANCE_COUNT, PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT, METADATA_ERROR_COUNT,
UNCLEAN_LEADER_ELECTIONS_PER_SEC, UNCLEAN_LEADER_ELECTIONS_PER_SEC,
ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
IGNORED_STATIC_VOTERS IGNORED_STATIC_VOTERS
).forEach(r::removeMetric)); ).forEach(r::removeMetric));
} }

View File

@ -48,6 +48,7 @@ class ControllerMetricsChanges {
private int offlinePartitionsChange = 0; private int offlinePartitionsChange = 0;
private int partitionsWithoutPreferredLeaderChange = 0; private int partitionsWithoutPreferredLeaderChange = 0;
private int uncleanLeaderElection = 0; private int uncleanLeaderElection = 0;
private int electionFromElrCounter = 0;
public int fencedBrokersChange() { public int fencedBrokersChange() {
return fencedBrokersChange; return fencedBrokersChange;
@ -132,6 +133,9 @@ class ControllerMetricsChanges {
if (!PartitionRegistration.electionWasClean(next.leader, prevIsr, prevElr)) { if (!PartitionRegistration.electionWasClean(next.leader, prevIsr, prevElr)) {
uncleanLeaderElection++; uncleanLeaderElection++;
} }
if (PartitionRegistration.electionFromElr(next.leader, prevElr)) {
electionFromElrCounter++;
}
} }
globalPartitionsChange += delta(wasPresent, isPresent); globalPartitionsChange += delta(wasPresent, isPresent);
offlinePartitionsChange += delta(wasOffline, isOffline); offlinePartitionsChange += delta(wasOffline, isOffline);
@ -164,5 +168,9 @@ class ControllerMetricsChanges {
metrics.updateUncleanLeaderElection(uncleanLeaderElection); metrics.updateUncleanLeaderElection(uncleanLeaderElection);
uncleanLeaderElection = 0; uncleanLeaderElection = 0;
} }
if (electionFromElrCounter > 0) {
metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElrCounter);
electionFromElrCounter = 0;
}
} }
} }

View File

@ -169,6 +169,10 @@ public class PartitionRegistration {
return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) || Replicas.contains(elr, newLeader); return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) || Replicas.contains(elr, newLeader);
} }
public static boolean electionFromElr(int newLeader, int[] elr) {
return Replicas.contains(elr, newLeader);
}
private static List<Uuid> checkDirectories(PartitionRecord record) { private static List<Uuid> checkDirectories(PartitionRecord record) {
if (record.directories() != null && !record.directories().isEmpty() && record.replicas().size() != record.directories().size()) { if (record.directories() != null && !record.directories().isEmpty() && record.replicas().size() != record.directories().size()) {
throw new InvalidReplicaDirectoriesException(record); throw new InvalidReplicaDirectoriesException(record);

View File

@ -49,7 +49,8 @@ public class ControllerMetadataMetricsTest {
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount", "kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount", "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
"kafka.controller:type=KafkaController,name=IgnoredStaticVoters", "kafka.controller:type=KafkaController,name=IgnoredStaticVoters",
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec" "kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec",
"kafka.controller:type=ControllerStats,name=ElectionFromEligibleLeaderReplicasPerSec"
))); )));
} }
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "KafkaController", ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "KafkaController",
@ -192,6 +193,22 @@ public class ControllerMetadataMetricsTest {
} }
} }
@SuppressWarnings("LocalVariableName")
@Test
public void testUpdateElectionFromEligibleLeaderReplicasCount() {
MetricsRegistry registry = new MetricsRegistry();
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
Meter ElectionFromEligibleLeaderReplicasPerSec = (Meter) registry
.allMetrics()
.get(metricName("ControllerStats", "ElectionFromEligibleLeaderReplicasPerSec"));
assertEquals(0, ElectionFromEligibleLeaderReplicasPerSec.count());
metrics.updateElectionFromEligibleLeaderReplicasCount(2);
assertEquals(2, ElectionFromEligibleLeaderReplicasPerSec.count());
} finally {
registry.shutdown();
}
}
@Test @Test
public void testIgnoredStaticVoters() { public void testIgnoredStaticVoters() {
MetricsRegistry registry = new MetricsRegistry(); MetricsRegistry registry = new MetricsRegistry();

View File

@ -63,6 +63,12 @@ public class PartitionRegistrationTest {
assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new int[]{1, 2, 3})); assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new int[]{1, 2, 3}));
} }
@Test
public void testEligibleLeaderReplicasElection() {
assertTrue(PartitionRegistration.electionFromElr(1, new int[]{1, 2}));
assertFalse(PartitionRegistration.electionFromElr(1, new int[]{0, 2}));
}
@Test @Test
public void testPartitionControlInfoMergeAndDiff() { public void testPartitionControlInfoMergeAndDiff() {
PartitionRegistration a = new PartitionRegistration.Builder(). PartitionRegistration a = new PartitionRegistration.Builder().