mirror of https://github.com/apache/kafka.git
KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
f50f13d781
commit
80ec8fbcd5
|
@ -20,6 +20,7 @@ package org.apache.kafka.controller;
|
|||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
import org.apache.kafka.timeline.TimelineHashMap;
|
||||
import org.apache.kafka.timeline.TimelineInteger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -139,10 +140,13 @@ public class BrokersToIsrs {
|
|||
* Partitions with no isr members appear in this map under id NO_LEADER.
|
||||
*/
|
||||
private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;
|
||||
|
||||
private final TimelineInteger offlinePartitionCount;
|
||||
|
||||
BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
|
||||
this.snapshotRegistry = snapshotRegistry;
|
||||
this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -163,6 +167,9 @@ public class BrokersToIsrs {
|
|||
} else {
|
||||
if (prevLeader == NO_LEADER) {
|
||||
prev = Replicas.copyWith(prevIsr, NO_LEADER);
|
||||
if (nextLeader != NO_LEADER) {
|
||||
offlinePartitionCount.decrement();
|
||||
}
|
||||
} else {
|
||||
prev = Replicas.clone(prevIsr);
|
||||
}
|
||||
|
@ -174,6 +181,9 @@ public class BrokersToIsrs {
|
|||
} else {
|
||||
if (nextLeader == NO_LEADER) {
|
||||
next = Replicas.copyWith(nextIsr, NO_LEADER);
|
||||
if (prevLeader != NO_LEADER) {
|
||||
offlinePartitionCount.increment();
|
||||
}
|
||||
} else {
|
||||
next = Replicas.clone(nextIsr);
|
||||
}
|
||||
|
@ -217,6 +227,9 @@ public class BrokersToIsrs {
|
|||
void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
|
||||
Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
|
||||
if (topicMap != null) {
|
||||
if (brokerId == NO_LEADER) {
|
||||
offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
|
||||
}
|
||||
topicMap.remove(topicId);
|
||||
}
|
||||
}
|
||||
|
@ -326,4 +339,8 @@ public class BrokersToIsrs {
|
|||
boolean hasLeaderships(int brokerId) {
|
||||
return iterator(brokerId, true).hasNext();
|
||||
}
|
||||
|
||||
int offlinePartitionCount() {
|
||||
return offlinePartitionCount.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,4 +34,12 @@ public interface ControllerMetrics {
|
|||
void setGlobalPartitionCount(int partitionCount);
|
||||
|
||||
int globalPartitionCount();
|
||||
|
||||
void setOfflinePartitionCount(int offlinePartitions);
|
||||
|
||||
int offlinePartitionCount();
|
||||
|
||||
void setPreferredReplicaImbalanceCount(int replicaImbalances);
|
||||
|
||||
int preferredReplicaImbalanceCount();
|
||||
}
|
||||
|
|
|
@ -34,13 +34,21 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
|
|||
"kafka.controller", "KafkaController", "GlobalTopicCount", null);
|
||||
private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
|
||||
"kafka.controller", "KafkaController", "GlobalPartitionCount", null);
|
||||
|
||||
private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName(
|
||||
"kafka.controller", "KafkaController", "OfflinePartitionCount", null);
|
||||
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new MetricName(
|
||||
"kafka.controller", "KafkaController", "PreferredReplicaImbalanceCount", null);
|
||||
|
||||
private volatile boolean active;
|
||||
private volatile int globalTopicCount;
|
||||
private volatile int globalPartitionCount;
|
||||
private volatile int offlinePartitionCount;
|
||||
private volatile int preferredReplicaImbalanceCount;
|
||||
private final Gauge<Integer> activeControllerCount;
|
||||
private final Gauge<Integer> globalPartitionCountGauge;
|
||||
private final Gauge<Integer> globalTopicCountGauge;
|
||||
private final Gauge<Integer> offlinePartitionCountGauge;
|
||||
private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
|
||||
private final Histogram eventQueueTime;
|
||||
private final Histogram eventQueueProcessingTime;
|
||||
|
||||
|
@ -48,6 +56,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
|
|||
this.active = false;
|
||||
this.globalTopicCount = 0;
|
||||
this.globalPartitionCount = 0;
|
||||
this.offlinePartitionCount = 0;
|
||||
this.preferredReplicaImbalanceCount = 0;
|
||||
this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
|
||||
@Override
|
||||
public Integer value() {
|
||||
|
@ -68,6 +78,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
|
|||
return globalPartitionCount;
|
||||
}
|
||||
});
|
||||
this.offlinePartitionCountGauge = registry.newGauge(OFFLINE_PARTITION_COUNT, new Gauge<Integer>() {
|
||||
@Override
|
||||
public Integer value() {
|
||||
return offlinePartitionCount;
|
||||
}
|
||||
});
|
||||
this.preferredReplicaImbalanceCountGauge = registry.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge<Integer>() {
|
||||
@Override
|
||||
public Integer value() {
|
||||
return preferredReplicaImbalanceCount;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,4 +131,24 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
|
|||
public int globalPartitionCount() {
|
||||
return this.globalPartitionCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOfflinePartitionCount(int offlinePartitions) {
|
||||
this.offlinePartitionCount = offlinePartitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int offlinePartitionCount() {
|
||||
return this.offlinePartitionCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
|
||||
this.preferredReplicaImbalanceCount = replicaImbalances;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int preferredReplicaImbalanceCount() {
|
||||
return this.preferredReplicaImbalanceCount;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -227,6 +227,10 @@ public class ReplicationControlManager {
|
|||
return leader != NO_LEADER;
|
||||
}
|
||||
|
||||
boolean hasPreferredLeader() {
|
||||
return leader == preferredReplica();
|
||||
}
|
||||
|
||||
int preferredReplica() {
|
||||
return replicas.length == 0 ? NO_LEADER : replicas[0];
|
||||
}
|
||||
|
@ -285,6 +289,11 @@ public class ReplicationControlManager {
|
|||
*/
|
||||
private final TimelineInteger globalPartitionCount;
|
||||
|
||||
/**
|
||||
* A count of the number of partitions that do not have their first replica as a leader.
|
||||
*/
|
||||
private final TimelineInteger preferredReplicaImbalanceCount;
|
||||
|
||||
/**
|
||||
* A reference to the controller's configuration control manager.
|
||||
*/
|
||||
|
@ -330,6 +339,7 @@ public class ReplicationControlManager {
|
|||
this.controllerMetrics = controllerMetrics;
|
||||
this.clusterControl = clusterControl;
|
||||
this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
|
||||
this.preferredReplicaImbalanceCount = new TimelineInteger(snapshotRegistry);
|
||||
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
|
||||
|
@ -366,6 +376,11 @@ public class ReplicationControlManager {
|
|||
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
|
||||
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
|
||||
}
|
||||
if (newPartInfo.leader != newPartInfo.preferredReplica()) {
|
||||
preferredReplicaImbalanceCount.increment();
|
||||
}
|
||||
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
|
||||
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
|
||||
}
|
||||
|
||||
public void replay(PartitionChangeRecord record) {
|
||||
|
@ -387,6 +402,11 @@ public class ReplicationControlManager {
|
|||
String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
|
||||
record.topicId();
|
||||
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
|
||||
if (!newPartitionInfo.hasPreferredLeader() && prevPartitionInfo.hasPreferredLeader()) {
|
||||
preferredReplicaImbalanceCount.increment();
|
||||
}
|
||||
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
|
||||
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
|
||||
}
|
||||
|
||||
public void replay(RemoveTopicRecord record) {
|
||||
|
@ -406,11 +426,17 @@ public class ReplicationControlManager {
|
|||
for (int i = 0; i < partition.isr.length; i++) {
|
||||
brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
|
||||
}
|
||||
if (partition.leader != partition.preferredReplica()) {
|
||||
preferredReplicaImbalanceCount.decrement();
|
||||
}
|
||||
globalPartitionCount.decrement();
|
||||
}
|
||||
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
|
||||
|
||||
controllerMetrics.setGlobalTopicsCount(topics.size());
|
||||
controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
|
||||
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
|
||||
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
|
||||
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
|
||||
}
|
||||
|
||||
|
|
|
@ -21,11 +21,16 @@ public final class MockControllerMetrics implements ControllerMetrics {
|
|||
private volatile boolean active;
|
||||
private volatile int topics;
|
||||
private volatile int partitions;
|
||||
private volatile int offlinePartitions;
|
||||
private volatile int preferredReplicaImbalances;
|
||||
|
||||
|
||||
public MockControllerMetrics() {
|
||||
this.active = false;
|
||||
this.topics = 0;
|
||||
this.partitions = 0;
|
||||
this.offlinePartitions = 0;
|
||||
this.preferredReplicaImbalances = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,4 +72,24 @@ public final class MockControllerMetrics implements ControllerMetrics {
|
|||
public int globalPartitionCount() {
|
||||
return this.partitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOfflinePartitionCount(int offlinePartitions) {
|
||||
this.offlinePartitions = offlinePartitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int offlinePartitionCount() {
|
||||
return this.offlinePartitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
|
||||
this.preferredReplicaImbalances = replicaImbalances;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int preferredReplicaImbalanceCount() {
|
||||
return this.preferredReplicaImbalances;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -253,6 +253,66 @@ public class ReplicationControlManagerTest {
|
|||
assertEquals(0, ctx.metrics.globalPartitionCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception {
|
||||
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
|
||||
ReplicationControlManager replicationControl = ctx.replicationControl;
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
registerBroker(i, ctx);
|
||||
unfenceBroker(i, ctx);
|
||||
}
|
||||
|
||||
CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
|
||||
new int[] {0, 2}, new int[] {0, 1}});
|
||||
|
||||
CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][] {
|
||||
new int[] {0, 1, 2}, new int[] {1, 2, 3}, new int[] {1, 2, 0}});
|
||||
|
||||
ControllerResult<Void> result = replicationControl.unregisterBroker(0);
|
||||
ctx.replay(result.records());
|
||||
|
||||
// All partitions should still be online after unregistering broker 0
|
||||
assertEquals(0, ctx.metrics.offlinePartitionCount());
|
||||
// Three partitions should not have their preferred (first) replica 0
|
||||
assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount());
|
||||
|
||||
result = replicationControl.unregisterBroker(1);
|
||||
ctx.replay(result.records());
|
||||
|
||||
// After unregistering broker 1, 1 partition for topic foo should go offline
|
||||
assertEquals(1, ctx.metrics.offlinePartitionCount());
|
||||
// All five partitions should not have their preferred (first) replica at this point
|
||||
assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount());
|
||||
|
||||
result = replicationControl.unregisterBroker(2);
|
||||
ctx.replay(result.records());
|
||||
|
||||
// After unregistering broker 2, the last partition for topic foo should go offline
|
||||
// and 2 partitions for topic zar should go offline
|
||||
assertEquals(4, ctx.metrics.offlinePartitionCount());
|
||||
|
||||
result = replicationControl.unregisterBroker(3);
|
||||
ctx.replay(result.records());
|
||||
|
||||
// After unregistering broker 3 the last partition for topic zar should go offline
|
||||
assertEquals(5, ctx.metrics.offlinePartitionCount());
|
||||
|
||||
// Deleting topic foo should bring the offline partition count down to 3
|
||||
ArrayList<ApiMessageAndVersion> records = new ArrayList<>();
|
||||
replicationControl.deleteTopic(foo.topicId(), records);
|
||||
ctx.replay(records);
|
||||
|
||||
assertEquals(3, ctx.metrics.offlinePartitionCount());
|
||||
|
||||
// Deleting topic zar should bring the offline partition count down to 0
|
||||
records = new ArrayList<>();
|
||||
replicationControl.deleteTopic(zar.topicId(), records);
|
||||
ctx.replay(records);
|
||||
|
||||
assertEquals(0, ctx.metrics.offlinePartitionCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateNewTopicNames() {
|
||||
Map<String, ApiError> topicErrors = new HashMap<>();
|
||||
|
|
Loading…
Reference in New Issue