KAFKA-18666: Controller-side monitoring for broker shutdown and startup (#19586)
CI / build (push) Waiting to run Details

This PR introduces the following per-broker metrics:

-`kafka.controller:type=KafkaController,name=BrokerRegistrationState,broker=X`

-`kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=X`

and this metric:

`kafka.controller:type=KafkaController,name=ControlledShutdownBrokerCount`

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Kevin Wu 2025-05-14 12:59:47 -05:00 committed by GitHub
parent cbfbbe833d
commit 37963256d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 445 additions and 22 deletions

View File

@ -224,7 +224,7 @@ class ControllerServer(
val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time)
quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.brokerSessionTimeoutMs)
new QuorumController.Builder(config.nodeId, sharedServer.clusterId).
setTime(time).

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
@ -92,6 +93,7 @@ public class ClusterControlManager {
private FeatureControlManager featureControl = null;
private BrokerShutdownHandler brokerShutdownHandler = null;
private String interBrokerListenerName = "PLAINTEXT";
private QuorumControllerMetrics metrics = null;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@ -138,6 +140,11 @@ public class ClusterControlManager {
return this;
}
Builder setMetrics(QuorumControllerMetrics metrics) {
this.metrics = metrics;
return this;
}
ClusterControlManager build() {
if (logContext == null) {
logContext = new LogContext();
@ -157,6 +164,9 @@ public class ClusterControlManager {
if (brokerShutdownHandler == null) {
throw new RuntimeException("You must specify BrokerShutdownHandler");
}
if (metrics == null) {
metrics = new QuorumControllerMetrics(Optional.empty(), time, 0);
}
return new ClusterControlManager(logContext,
clusterId,
time,
@ -165,7 +175,8 @@ public class ClusterControlManager {
replicaPlacer,
featureControl,
brokerShutdownHandler,
interBrokerListenerName
interBrokerListenerName,
metrics
);
}
}
@ -269,6 +280,11 @@ public class ClusterControlManager {
*/
private final TimelineHashMap<Uuid, Integer> directoryToBroker;
/**
* Manages the kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=<brokerId> metrics.
*/
private final QuorumControllerMetrics metrics;
private ClusterControlManager(
LogContext logContext,
String clusterId,
@ -278,7 +294,8 @@ public class ClusterControlManager {
ReplicaPlacer replicaPlacer,
FeatureControlManager featureControl,
BrokerShutdownHandler brokerShutdownHandler,
String interBrokerListenerName
String interBrokerListenerName,
QuorumControllerMetrics metrics
) {
this.logContext = logContext;
this.clusterId = clusterId;
@ -295,6 +312,7 @@ public class ClusterControlManager {
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokerShutdownHandler = brokerShutdownHandler;
this.interBrokerListenerName = interBrokerListenerName;
this.metrics = metrics;
}
ReplicaPlacer replicaPlacer() {
@ -308,10 +326,12 @@ public class ClusterControlManager {
heartbeatManager = new BrokerHeartbeatManager(logContext, time, sessionTimeoutNs);
long nowNs = time.nanoseconds();
for (BrokerRegistration registration : brokerRegistrations.values()) {
heartbeatManager.register(registration.id(), registration.fenced());
int brokerId = registration.id();
heartbeatManager.register(brokerId, registration.fenced());
metrics.addTimeSinceLastHeartbeatMetric(brokerId);
if (!registration.fenced()) {
heartbeatManager.tracker().updateContactTime(
new BrokerIdAndEpoch(registration.id(), registration.epoch()), nowNs);
new BrokerIdAndEpoch(brokerId, registration.epoch()), nowNs);
}
}
}
@ -325,6 +345,7 @@ public class ClusterControlManager {
*/
public void deactivate() {
heartbeatManager = null;
metrics.removeTimeSinceLastHeartbeatMetrics();
}
Map<Integer, BrokerRegistration> brokerRegistrations() {
@ -431,6 +452,7 @@ public class ClusterControlManager {
log.info("No previous registration found for broker {}. New incarnation ID is " +
"{}. Generated {} record(s) to clean up previous incarnations. New broker " +
"epoch is {}.", brokerId, request.incarnationId(), numRecordsAdded, newBrokerEpoch);
metrics.addTimeSinceLastHeartbeatMetric(brokerId);
} else {
log.info("Registering a new incarnation of broker {}. Previous incarnation ID " +
"was {}; new incarnation ID is {}. Generated {} record(s) to clean up " +

View File

@ -75,7 +75,7 @@ class OffsetControlManager {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (metrics == null) {
metrics = new QuorumControllerMetrics(Optional.empty(), time);
metrics = new QuorumControllerMetrics(Optional.empty(), time, 0);
}
return new OffsetControlManager(logContext,
snapshotRegistry,

View File

@ -406,7 +406,7 @@ public final class QuorumController implements Controller {
logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
}
if (controllerMetrics == null) {
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time);
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time, 0);
}
KafkaEventQueue queue = null;
@ -1531,6 +1531,7 @@ public final class QuorumController implements Controller {
setFeatureControlManager(featureControl).
setBrokerShutdownHandler(this::handleBrokerShutdown).
setInterBrokerListenerName(interBrokerListenerName).
setMetrics(controllerMetrics).
build();
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).
@ -1781,6 +1782,7 @@ public final class QuorumController implements Controller {
ControllerRequestContext context,
int brokerId
) {
controllerMetrics.removeTimeSinceLastHeartbeatMetric(brokerId);
return appendWriteEvent("unregisterBroker", context.deadlineNs(),
() -> replicationControl.unregisterBroker(brokerId),
EnumSet.noneOf(ControllerOperationFlag.class));
@ -1928,6 +1930,7 @@ public final class QuorumController implements Controller {
ControllerRequestContext context,
BrokerHeartbeatRequestData request
) {
controllerMetrics.updateBrokerContactTime(request.brokerId());
// We start by updating the broker heartbeat in a lockless data structure.
// We do this first so that if the main controller thread is backlogged, the
// last contact time update still gets through.

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.metadata.BrokerRegistration;
@InterfaceStability.Stable
public enum BrokerRegistrationState {
UNREGISTERED(-1),
FENCED(10),
CONTROLLED_SHUTDOWN(20),
ACTIVE(30);
private final int state;
BrokerRegistrationState(int state) {
this.state = state;
}
public int state() {
return state;
}
public static BrokerRegistrationState getBrokerRegistrationState(BrokerRegistration brokerRegistration) {
if (brokerRegistration.fenced()) {
return BrokerRegistrationState.FENCED;
} else if (brokerRegistration.inControlledShutdown()) {
return BrokerRegistrationState.CONTROLLED_SHUTDOWN;
} else {
return BrokerRegistrationState.ACTIVE;
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.controller.metrics;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import com.yammer.metrics.core.Gauge;
@ -24,12 +25,17 @@ import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.controller.metrics.BrokerRegistrationState.getBrokerRegistrationState;
/**
* These are the metrics which are managed by the ControllerServer class. They generally pertain to
* aspects of the metadata, like how many topics or partitions we have.
@ -43,6 +49,11 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "FencedBrokerCount");
private static final MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private static final MetricName CONTROLLED_SHUTDOWN_BROKER_COUNT = getMetricName(
"KafkaController", "ControlledShutdownBrokerCount"
);
private static final String BROKER_REGISTRATION_STATE_METRIC_NAME = "BrokerRegistrationState";
private static final String BROKER_ID_TAG = "broker";
private static final MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private static final MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@ -63,6 +74,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
private final AtomicInteger controllerShutdownBrokerCount = new AtomicInteger(0);
private final Map<Integer, AtomicInteger> brokerRegistrationStates = new ConcurrentHashMap<>();
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
@ -91,6 +104,12 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
return activeBrokerCount();
}
}));
registry.ifPresent(r -> r.newGauge(CONTROLLED_SHUTDOWN_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return controlledShutdownBrokerCount();
}
}));
registry.ifPresent(r -> r.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
@ -134,6 +153,36 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
}));
}
public void addBrokerRegistrationStateMetric(int brokerId) {
registry.ifPresent(r -> r.newGauge(
getBrokerIdTagMetricName(
"KafkaController",
BROKER_REGISTRATION_STATE_METRIC_NAME,
brokerId
),
new Gauge<Integer>() {
@Override
public Integer value() {
return brokerRegistrationState(brokerId);
}
}
));
}
public void removeBrokerRegistrationStateMetric(int brokerId) {
registry.ifPresent(r -> r.removeMetric(
getBrokerIdTagMetricName(
"KafkaController",
BROKER_REGISTRATION_STATE_METRIC_NAME,
brokerId
)
));
}
public Optional<MetricsRegistry> registry() {
return registry;
}
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokerCount.set(brokerCount);
}
@ -157,6 +206,40 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
public int activeBrokerCount() {
return this.activeBrokerCount.get();
}
public void setControlledShutdownBrokerCount(int brokerCount) {
this.controllerShutdownBrokerCount.set(brokerCount);
}
public void addToControlledShutdownBrokerCount(int brokerCountDelta) {
this.controllerShutdownBrokerCount.addAndGet(brokerCountDelta);
}
public int controlledShutdownBrokerCount() {
return this.controllerShutdownBrokerCount.get();
}
public void setBrokerRegistrationState(int brokerId, BrokerRegistration brokerRegistration) {
// if the broker is unregistered, remove the metric and state
if (brokerRegistration == null) {
removeBrokerRegistrationStateMetric(brokerId);
brokerRegistrationStates.remove(brokerId);
return;
}
BrokerRegistrationState brokerState = getBrokerRegistrationState(brokerRegistration);
if (brokerRegistrationStates.containsKey(brokerId)) {
brokerRegistrationStates.get(brokerId).set(brokerState.state());
} else {
brokerRegistrationStates.put(brokerId, new AtomicInteger(brokerState.state()));
}
}
public int brokerRegistrationState(int brokerId) {
return this.brokerRegistrationStates.getOrDefault(
brokerId,
new AtomicInteger(BrokerRegistrationState.UNREGISTERED.state())
).get();
}
public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount.set(topicCount);
@ -235,6 +318,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
registry.ifPresent(r -> List.of(
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
CONTROLLED_SHUTDOWN_BROKER_COUNT,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
@ -244,9 +328,18 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
IGNORED_STATIC_VOTERS
).forEach(r::removeMetric));
for (int brokerId : brokerRegistrationStates.keySet()) {
removeBrokerRegistrationStateMetric(brokerId);
}
}
private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name);
}
private static MetricName getBrokerIdTagMetricName(String type, String name, int brokerId) {
LinkedHashMap<String, String> brokerIdTag = new LinkedHashMap<>();
brokerIdTag.put(BROKER_ID_TAG, Integer.toString(brokerId));
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name, brokerIdTag);
}
}

View File

@ -93,8 +93,11 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
if (delta.clusterDelta() != null) {
for (Entry<Integer, Optional<BrokerRegistration>> entry :
delta.clusterDelta().changedBrokers().entrySet()) {
changes.handleBrokerChange(prevImage.cluster().brokers().get(entry.getKey()),
entry.getValue().orElse(null));
changes.handleBrokerChange(
prevImage.cluster().brokers().get(entry.getKey()),
entry.getValue().orElse(null),
metrics
);
}
}
if (delta.topicsDelta() != null) {
@ -117,15 +120,20 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
int fencedBrokers = 0;
int activeBrokers = 0;
int controlledShutdownBrokers = 0;
for (BrokerRegistration broker : newImage.cluster().brokers().values()) {
if (broker.fenced()) {
fencedBrokers++;
} else {
activeBrokers++;
}
if (broker.inControlledShutdown()) {
controlledShutdownBrokers++;
}
}
metrics.setFencedBrokerCount(fencedBrokers);
metrics.setActiveBrokerCount(activeBrokers);
metrics.setControlledShutdownBrokerCount(controlledShutdownBrokers);
int totalPartitions = 0;
int offlinePartitions = 0;

View File

@ -29,6 +29,7 @@ import java.util.Map.Entry;
* The ControllerMetricsChanges class is used inside ControllerMetricsPublisher to track the
* metrics changes triggered by a series of deltas.
*/
@SuppressWarnings("NPathComplexity")
class ControllerMetricsChanges {
/**
* Calculates the change between two boolean values, expressed as an integer.
@ -43,6 +44,7 @@ class ControllerMetricsChanges {
private int fencedBrokersChange = 0;
private int activeBrokersChange = 0;
private int controlledShutdownBrokersChange = 0;
private int globalTopicsChange = 0;
private int globalPartitionsChange = 0;
private int offlinePartitionsChange = 0;
@ -58,6 +60,10 @@ class ControllerMetricsChanges {
return activeBrokersChange;
}
public int controlledShutdownBrokersChange() {
return controlledShutdownBrokersChange;
}
public int globalTopicsChange() {
return globalTopicsChange;
}
@ -82,21 +88,33 @@ class ControllerMetricsChanges {
return partitionsWithoutPreferredLeaderChange;
}
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next, ControllerMetadataMetrics metrics) {
boolean wasFenced = false;
boolean wasActive = false;
boolean wasInControlledShutdown = false;
if (prev != null) {
wasFenced = prev.fenced();
wasActive = !prev.fenced();
wasInControlledShutdown = prev.inControlledShutdown();
} else {
metrics.addBrokerRegistrationStateMetric(next.id());
}
boolean isFenced = false;
boolean isActive = false;
boolean isInControlledShutdown = false;
final int brokerId;
if (next != null) {
isFenced = next.fenced();
isActive = !next.fenced();
isInControlledShutdown = next.inControlledShutdown();
brokerId = next.id();
} else {
brokerId = prev.id();
}
metrics.setBrokerRegistrationState(brokerId, next);
fencedBrokersChange += delta(wasFenced, isFenced);
activeBrokersChange += delta(wasActive, isActive);
controlledShutdownBrokersChange += delta(wasInControlledShutdown, isInControlledShutdown);
}
void handleDeletedTopic(TopicImage deletedTopic) {
@ -154,6 +172,9 @@ class ControllerMetricsChanges {
if (activeBrokersChange != 0) {
metrics.addToActiveBrokerCount(activeBrokersChange);
}
if (controlledShutdownBrokersChange != 0) {
metrics.addToControlledShutdownBrokerCount(controlledShutdownBrokersChange);
}
if (globalTopicsChange != 0) {
metrics.addToGlobalTopicCount(globalTopicsChange);
}

View File

@ -25,8 +25,11 @@ import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -61,8 +64,11 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
private final Optional<MetricsRegistry> registry;
private final Time time;
private volatile boolean active;
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
@ -74,6 +80,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong operationsStarted = new AtomicLong(0);
private final AtomicLong operationsTimedOut = new AtomicLong(0);
private final AtomicLong newActiveControllers = new AtomicLong(0);
private final Map<Integer, AtomicLong> brokerContactTimesMs = new ConcurrentHashMap<>();
private final int sessionTimeoutMs;
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
@ -86,9 +94,11 @@ public class QuorumControllerMetrics implements AutoCloseable {
public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
Time time
Time time,
int sessionTimeoutMs
) {
this.registry = registry;
this.time = time;
this.active = false;
registry.ifPresent(r -> r.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
@Override
@ -98,6 +108,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
}));
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
@ -148,6 +159,41 @@ public class QuorumControllerMetrics implements AutoCloseable {
}));
}
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
brokerContactTimesMs.put(brokerId, new AtomicLong(time.milliseconds()));
registry.ifPresent(r -> r.newGauge(
getBrokerIdTagMetricName(
"KafkaController",
TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME,
brokerId
),
new Gauge<Integer>() {
@Override
public Integer value() {
return timeSinceLastHeartbeatMs(brokerId);
}
}
));
}
public void removeTimeSinceLastHeartbeatMetric(int brokerId) {
registry.ifPresent(r -> r.removeMetric(
getBrokerIdTagMetricName(
"KafkaController",
TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME,
brokerId
)
));
brokerContactTimesMs.remove(brokerId);
}
public void removeTimeSinceLastHeartbeatMetrics() {
for (int brokerId : brokerContactTimesMs.keySet()) {
removeTimeSinceLastHeartbeatMetric(brokerId);
}
brokerContactTimesMs.clear();
}
public void setActive(boolean active) {
this.active = active;
}
@ -220,6 +266,21 @@ public class QuorumControllerMetrics implements AutoCloseable {
return newActiveControllers.get();
}
public void updateBrokerContactTime(int brokerId) {
brokerContactTimesMs.putIfAbsent(brokerId, new AtomicLong(time.milliseconds()));
}
public int timeSinceLastHeartbeatMs(int brokerId) {
if (!brokerContactTimesMs.containsKey(brokerId)) {
return sessionTimeoutMs;
} else {
return Math.min(
(int) (time.milliseconds() - brokerContactTimesMs.get(brokerId).get()),
sessionTimeoutMs
);
}
}
@Override
public void close() {
registry.ifPresent(r -> List.of(
@ -235,9 +296,16 @@ public class QuorumControllerMetrics implements AutoCloseable {
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name);
}
private static MetricName getBrokerIdTagMetricName(String type, String name, int brokerId) {
LinkedHashMap<String, String> brokerIdTag = new LinkedHashMap<>();
brokerIdTag.put(BROKER_ID_TAG, Integer.toString(brokerId));
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name, brokerIdTag);
}
}

View File

@ -57,7 +57,7 @@ public class QuorumControllerMetricsIntegrationTest {
final AtomicBoolean closed = new AtomicBoolean(false);
MockControllerMetrics() {
super(Optional.empty(), Time.SYSTEM);
super(Optional.empty(), Time.SYSTEM, 0);
}
@Override

View File

@ -17,6 +17,8 @@
package org.apache.kafka.controller.metrics;
import org.apache.kafka.metadata.BrokerRegistration;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
@ -32,6 +34,7 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ControllerMetadataMetricsTest {
@Test
@ -39,10 +42,23 @@ public class ControllerMetadataMetricsTest {
MetricsRegistry registry = new MetricsRegistry();
try {
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
metrics.addBrokerRegistrationStateMetric(0);
metrics.setBrokerRegistrationState(
1,
brokerRegistration(false, false)
);
metrics.addBrokerRegistrationStateMetric(1);
metrics.setBrokerRegistrationState(
2,
brokerRegistration(false, false)
);
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller:",
new HashSet<>(List.of(
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
"kafka.controller:type=KafkaController,name=ControlledShutdownBrokerCount",
"kafka.controller:type=KafkaController,name=BrokerRegistrationState,broker=0",
"kafka.controller:type=KafkaController,name=BrokerRegistrationState,broker=1",
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
@ -81,6 +97,11 @@ public class ControllerMetadataMetricsTest {
return new MetricName("kafka.controller", type, name, null, mBeanName);
}
private static MetricName metricName(String type, String name, String scope) {
String mBeanName = String.format("kafka.controller:type=%s,name=%s,%s", type, name, scope);
return new MetricName("kafka.controller", type, name, scope, mBeanName);
}
private void testIntGaugeMetric(
Function<ControllerMetadataMetrics, Integer> metricsGetter,
Function<MetricsRegistry, Integer> registryGetter,
@ -129,6 +150,52 @@ public class ControllerMetadataMetricsTest {
);
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testControlledShutdownCountMetric() {
testIntGaugeMetric(
m -> m.controlledShutdownBrokerCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "ControlledShutdownBrokerCount"))).value(),
(m, v) -> m.setControlledShutdownBrokerCount(v),
(m, v) -> m.addToControlledShutdownBrokerCount(v)
);
}
@Test
public void testBrokerRegistrationStateMetrics() {
MetricsRegistry registry = new MetricsRegistry();
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
int brokerId = 1;
MetricName name = metricName("KafkaController", "BrokerRegistrationState", "broker=1");
metrics.addBrokerRegistrationStateMetric(brokerId);
Gauge<Integer> registrationState = (Gauge<Integer>) registry.allMetrics().get(name);
metrics.setBrokerRegistrationState(brokerId, brokerRegistration(false, false));
assertEquals(BrokerRegistrationState.ACTIVE.state(), registrationState.value());
metrics.setBrokerRegistrationState(brokerId, brokerRegistration(true, false));
assertEquals(BrokerRegistrationState.FENCED.state(), registrationState.value());
metrics.setBrokerRegistrationState(brokerId, brokerRegistration(false, true));
assertEquals(BrokerRegistrationState.CONTROLLED_SHUTDOWN.state(), registrationState.value());
metrics.setBrokerRegistrationState(brokerId, null);
assertEquals(BrokerRegistrationState.UNREGISTERED.state(), registrationState.value());
assertNull(registry.allMetrics().get(name));
} finally {
registry.shutdown();
}
}
private BrokerRegistration brokerRegistration(boolean fenced, boolean inControlledShutdown) {
return new BrokerRegistration.Builder()
.setFenced(fenced)
.setInControlledShutdown(inControlledShutdown)
.build();
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testGlobalTopicCountMetric() {

View File

@ -28,11 +28,14 @@ import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.MetadataVersion;
import com.yammer.metrics.core.MetricsRegistry;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NORMAL;
@ -52,13 +55,22 @@ public class ControllerMetricsChangesTest {
private static BrokerRegistration brokerRegistration(
int brokerId,
boolean fenced
) {
return brokerRegistration(brokerId, fenced, false);
}
private static BrokerRegistration brokerRegistration(
int brokerId,
boolean fenced,
boolean controlledShutdown
) {
return new BrokerRegistration.Builder().
setId(brokerId).
setEpoch(100L).
setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")).
setFenced(fenced).
setInControlledShutdown(false).build();
setInControlledShutdown(controlledShutdown).
build();
}
@Test
@ -75,33 +87,80 @@ public class ControllerMetricsChangesTest {
@Test
public void testHandleNewUnfencedBroker() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(null, brokerRegistration(1, false));
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(null, brokerRegistration(brokerId, false), metrics);
assertEquals(0, changes.fencedBrokersChange());
assertEquals(1, changes.activeBrokersChange());
assertEquals(BrokerRegistrationState.ACTIVE.state(), metrics.brokerRegistrationState(brokerId));
}
@Test
public void testHandleNewFencedBroker() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(null, brokerRegistration(1, true));
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(null, brokerRegistration(brokerId, true), metrics);
assertEquals(1, changes.fencedBrokersChange());
assertEquals(0, changes.activeBrokersChange());
assertEquals(BrokerRegistrationState.FENCED.state(), metrics.brokerRegistrationState(brokerId));
}
@Test
public void testHandleBrokerFencing() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(brokerRegistration(1, false), brokerRegistration(1, true));
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(brokerRegistration(brokerId, false), brokerRegistration(brokerId, true), metrics);
assertEquals(1, changes.fencedBrokersChange());
assertEquals(-1, changes.activeBrokersChange());
assertEquals(BrokerRegistrationState.FENCED.state(), metrics.brokerRegistrationState(brokerId));
}
@Test
public void testHandleBrokerInControlledShutdownFencing() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(brokerRegistration(brokerId, false, true), brokerRegistration(brokerId, true, true), metrics);
assertEquals(1, changes.fencedBrokersChange());
assertEquals(-1, changes.activeBrokersChange());
assertEquals(BrokerRegistrationState.FENCED.state(), metrics.brokerRegistrationState(brokerId));
}
@Test
public void testHandleBrokerUnfencing() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(brokerRegistration(1, true), brokerRegistration(1, false));
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(brokerRegistration(brokerId, true), brokerRegistration(brokerId, false), metrics);
assertEquals(-1, changes.fencedBrokersChange());
assertEquals(1, changes.activeBrokersChange());
assertEquals(BrokerRegistrationState.ACTIVE.state(), metrics.brokerRegistrationState(brokerId));
}
@Test
public void testHandleBrokerControlledShutdown() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(brokerRegistration(brokerId, false), brokerRegistration(brokerId, false, true), metrics);
assertEquals(0, changes.fencedBrokersChange());
assertEquals(0, changes.activeBrokersChange());
assertEquals(1, changes.controlledShutdownBrokersChange());
assertEquals(BrokerRegistrationState.CONTROLLED_SHUTDOWN.state(), metrics.brokerRegistrationState(brokerId));
}
@Test
public void testHandleUnregisterBroker() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
int brokerId = 1;
changes.handleBrokerChange(brokerRegistration(brokerId, true, true), null, metrics);
assertEquals(-1, changes.fencedBrokersChange());
assertEquals(0, changes.activeBrokersChange());
assertEquals(-1, changes.controlledShutdownBrokersChange());
assertEquals(BrokerRegistrationState.UNREGISTERED.state(), metrics.brokerRegistrationState(brokerId));
}
@Test

View File

@ -45,6 +45,9 @@ public class ControllerMetricsTestUtils {
bld.append(name.getGroup());
bld.append(":type=").append(name.getType());
bld.append(",name=").append(name.getName());
if (name.hasScope()) {
bld.append(",").append(name.getScope().replaceAll("\\.", "="));
}
if (bld.toString().startsWith(expectedPrefix)) {
actual.add(bld.toString());
}

View File

@ -41,7 +41,9 @@ public class QuorumControllerMetricsTest {
try {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(
Optional.of(registry),
time)) {
time,
9000)) {
metrics.addTimeSinceLastHeartbeatMetric(1);
HashSet<String> expected = new HashSet<>(List.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
@ -53,7 +55,8 @@ public class QuorumControllerMetricsTest {
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
"kafka.controller:type=KafkaController,name=NewActiveControllersCount",
"kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount"
"kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount",
"kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=1"
));
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", expected);
}
@ -68,7 +71,7 @@ public class QuorumControllerMetricsTest {
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
metrics.updateEventQueueTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
} finally {
@ -80,7 +83,7 @@ public class QuorumControllerMetricsTest {
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
metrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
} finally {
@ -93,7 +96,7 @@ public class QuorumControllerMetricsTest {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
metrics.setLastAppliedRecordOffset(100);
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
@ -163,6 +166,28 @@ public class QuorumControllerMetricsTest {
}
}
@Test
public void testTimeSinceLastHeartbeatReceivedMs() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
int brokerId = 1;
int sessionTimeoutMs = 9000;
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, sessionTimeoutMs)) {
metrics.addTimeSinceLastHeartbeatMetric(1);
int numMetrics = registry.allMetrics().size();
Gauge<Integer> timeSinceLastHeartbeatReceivedMs = (Gauge<Integer>) registry.allMetrics().get(metricName("KafkaController", "TimeSinceLastHeartbeatReceivedMs", "broker=1"));
metrics.updateBrokerContactTime(brokerId);
time.sleep(1000);
assertEquals(1000, timeSinceLastHeartbeatReceivedMs.value());
time.sleep(100000);
assertEquals(sessionTimeoutMs, timeSinceLastHeartbeatReceivedMs.value());
metrics.removeTimeSinceLastHeartbeatMetrics();
assertEquals(numMetrics - 1, registry.allMetrics().size());
} finally {
registry.shutdown();
}
}
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
@ -174,4 +199,9 @@ public class QuorumControllerMetricsTest {
String mBeanName = String.format("kafka.controller:type=%s,name=%s", type, name);
return new MetricName("kafka.controller", type, name, null, mBeanName);
}
private static MetricName metricName(String type, String name, String scope) {
String mBeanName = String.format("kafka.controller:type=%s,name=%s,%s", type, name, scope);
return new MetricName("kafka.controller", type, name, scope, mBeanName);
}
}