KAFKA-19467; Add a metric for controller thread idleness (#20422)
CI / build (push) Waiting to run Details

This change adds the metric ControllerEventManager::AvgIdleRatio which
measures the amount of time the controller spends blocked waiting for
events vs the amount of time spent processing events. A value of 1.0
means that the controller spent the entire interval blocked waiting for
events.

Reviewers: José Armando García Sancio <jsancio@apache.org>, Kevin Wu
 <kevin.wu2412@gmail.com>, Alyssa Huang <ahuang@confluent.io>, TengYao
 Chi <frankvicky@apache.org>, Reviewers: Chia-Ping Tsai
 <chia7712@apache.org>
This commit is contained in:
Mahsa Seifikar 2025-10-02 14:02:47 -04:00 committed by GitHub
parent 33cd114375
commit 8468317dac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 168 additions and 23 deletions

View File

@ -497,6 +497,7 @@
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />

View File

@ -182,6 +182,10 @@
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
</li>
<li>
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

View File

@ -406,7 +406,14 @@ public final class QuorumController implements Controller {
KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
queue = new KafkaEventQueue(
time,
logContext,
threadNamePrefix,
EventQueue.VoidEvent.INSTANCE,
controllerMetrics::updateIdleTime
);
return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,

View File

@ -19,6 +19,7 @@ package org.apache.kafka.controller.metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.TimeRatio;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
"ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
@ -75,6 +79,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final TimeRatio avgIdleTimeRatio;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
@ -109,6 +114,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
@ -157,6 +163,20 @@ public class QuorumControllerMetrics implements AutoCloseable {
return newActiveControllers();
}
}));
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
@Override
public Double value() {
synchronized (avgIdleTimeRatio) {
return avgIdleTimeRatio.measure();
}
}
}));
}
public void updateIdleTime(long idleDurationMs) {
synchronized (avgIdleTimeRatio) {
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
}
}
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
@ -291,7 +311,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
NEW_ACTIVE_CONTROLLERS_COUNT,
AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}

View File

@ -45,6 +45,7 @@ public class QuorumControllerMetricsTest {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
@ -189,6 +190,35 @@ public class QuorumControllerMetricsTest {
}
}
@Test
public void testAvgIdleRatio() {
final double delta = 0.001;
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
// No idle time recorded yet; returns default ratio of 1.0
assertEquals(1.0, avgIdleRatio.value(), delta);
// First recording is dropped to establish the interval start time
// This is because TimeRatio needs an initial timestamp to measure intervals from
metrics.updateIdleTime(10);
time.sleep(40);
metrics.updateIdleTime(20);
// avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
assertEquals(0.5, avgIdleRatio.value(), delta);
time.sleep(20);
metrics.updateIdleTime(1);
// avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
assertEquals(0.05, avgIdleRatio.value(), delta);
} finally {
registry.shutdown();
}
}
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

View File

@ -29,6 +29,7 @@ import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.metrics.TimeRatio;
import java.util.List;
import java.util.OptionalLong;

View File

@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
@ -278,22 +279,22 @@ public final class KafkaEventQueue implements EventQueue {
remove(toRun);
continue;
}
if (awaitNs == Long.MAX_VALUE) {
try {
long startIdleMs = time.milliseconds();
try {
if (awaitNs == Long.MAX_VALUE) {
cond.await();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a new event. " +
"Shutting down event queue");
interrupted = true;
}
} else {
try {
} else {
cond.awaitNanos(awaitNs);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a deferred event. " +
"Shutting down event queue");
interrupted = true;
}
} catch (InterruptedException e) {
log.warn(
"Interrupted while waiting for a {} event. Shutting down event queue",
(awaitNs == Long.MAX_VALUE) ? "new" : "deferred"
);
interrupted = true;
} finally {
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
}
} finally {
lock.unlock();
@ -440,12 +441,18 @@ public final class KafkaEventQueue implements EventQueue {
*/
private boolean interrupted;
/**
* Optional callback for queue idle time tracking.
*/
private final Consumer<Long> idleTimeCallback;
public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix
) {
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> { });
}
public KafkaEventQueue(
@ -453,6 +460,16 @@ public final class KafkaEventQueue implements EventQueue {
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent
) {
this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
}
public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent,
Consumer<Long> idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@ -463,6 +480,7 @@ public final class KafkaEventQueue implements EventQueue {
this.eventHandler, false);
this.shuttingDown = false;
this.interrupted = false;
this.idleTimeCallback = Objects.requireNonNull(idleTimeCallback);
this.eventHandlerThread.start();
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
@ -46,11 +46,26 @@ public class TimeRatio implements MeasurableStat {
@Override
public double measure(MetricConfig config, long currentTimestampMs) {
return measure();
}
@Override
public void record(MetricConfig config, double value, long currentTimestampMs) {
record(value, currentTimestampMs);
}
/**
* Measures the ratio of recorded duration to the interval duration
* since the last measurement.
*
* @return The measured ratio value between 0.0 and 1.0
*/
public double measure() {
if (lastRecordedTimestampMs < 0) {
// Return the default value if no recordings have been captured.
return defaultRatio;
} else {
// We measure the ratio over the
// We measure the ratio over the interval
double intervalDurationMs = Math.max(lastRecordedTimestampMs - intervalStartTimestampMs, 0);
final double ratio;
if (intervalDurationMs == 0) {
@ -61,15 +76,20 @@ public class TimeRatio implements MeasurableStat {
ratio = totalRecordedDurationMs / intervalDurationMs;
}
// The next interval begins at the
// The next interval begins at the last recorded timestamp
intervalStartTimestampMs = lastRecordedTimestampMs;
totalRecordedDurationMs = 0;
return ratio;
}
}
@Override
public void record(MetricConfig config, double value, long currentTimestampMs) {
/**
* Records a duration value at the specified timestamp.
*
* @param value The duration value to record
* @param currentTimestampMs The current timestamp in milliseconds
*/
public void record(double value, long currentTimestampMs) {
if (intervalStartTimestampMs < 0) {
// Discard the initial value since the value occurred prior to the interval start
intervalStartTimestampMs = currentTimestampMs;
@ -78,5 +98,4 @@ public class TimeRatio implements MeasurableStat {
lastRecordedTimestampMs = currentTimestampMs;
}
}
}

View File

@ -424,4 +424,48 @@ public class KafkaEventQueueTest {
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
}
}
@Test
public void testIdleTimeCallback() throws Exception {
MockTime time = new MockTime();
AtomicLong lastIdleTimeMs = new AtomicLong(0);
try (KafkaEventQueue queue = new KafkaEventQueue(
time,
logContext,
"testIdleTimeCallback",
EventQueue.VoidEvent.INSTANCE,
lastIdleTimeMs::set)) {
time.sleep(2);
assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 0ms");
// Test 1: Two events with a wait in between using FutureEvent
CompletableFuture<String> event1 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event1, () -> {
time.sleep(1);
return "event1-processed";
}));
assertEquals("event1-processed", event1.get());
long waitTime5Ms = 5;
time.sleep(waitTime5Ms);
CompletableFuture<String> event2 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event2, () -> {
time.sleep(1);
return "event2-processed";
}));
assertEquals("event2-processed", event2.get());
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());
// Test 2: Deferred event
long waitTime2Ms = 2;
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
queue.scheduleDeferred("deferred2",
__ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
() -> deferredEvent2.complete(null));
time.sleep(waitTime2Ms);
deferredEvent2.get();
assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
}
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;