diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b1ef62ca3a2..c7f9eaad7ea 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -497,6 +497,7 @@
+
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d28898590f8..b5501bd2a74 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -182,6 +182,10 @@
kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent
.
For further details, please refer to KIP-1100.
+
+ A new metric AvgIdleRatio
has been added to the ControllerEventManager
group. This metric measures the average idle ratio of the controller event queue thread,
+ providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
+
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e1dd69723b..dfde76ecba5 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -406,7 +406,14 @@ public final class QuorumController implements Controller {
KafkaEventQueue queue = null;
try {
- queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
+ queue = new KafkaEventQueue(
+ time,
+ logContext,
+ threadNamePrefix,
+ EventQueue.VoidEvent.INSTANCE,
+ controllerMetrics::updateIdleTime
+ );
+
return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 310a2c1dd61..4a251faafc4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller.metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.metrics.TimeRatio;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
@@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
+ private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
+ "ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");
+
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
@@ -75,6 +79,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer eventQueueTimeUpdater;
private final Consumer eventQueueProcessingTimeUpdater;
+ private final TimeRatio avgIdleTimeRatio;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
@@ -109,6 +114,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
+ this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge() {
@Override
public Long value() {
@@ -157,6 +163,20 @@ public class QuorumControllerMetrics implements AutoCloseable {
return newActiveControllers();
}
}));
+ registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge() {
+ @Override
+ public Double value() {
+ synchronized (avgIdleTimeRatio) {
+ return avgIdleTimeRatio.measure();
+ }
+ }
+ }));
+ }
+
+ public void updateIdleTime(long idleDurationMs) {
+ synchronized (avgIdleTimeRatio) {
+ avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
+ }
}
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
@@ -291,7 +311,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
- NEW_ACTIVE_CONTROLLERS_COUNT
+ NEW_ACTIVE_CONTROLLERS_COUNT,
+ AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 491d22f1cd8..4aa50a561df 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -45,6 +45,7 @@ public class QuorumControllerMetricsTest {
Set expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+ "kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
@@ -189,6 +190,35 @@ public class QuorumControllerMetricsTest {
}
}
+ @Test
+ public void testAvgIdleRatio() {
+ final double delta = 0.001;
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
+ Gauge avgIdleRatio = (Gauge) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
+
+ // No idle time recorded yet; returns default ratio of 1.0
+ assertEquals(1.0, avgIdleRatio.value(), delta);
+
+ // First recording is dropped to establish the interval start time
+ // This is because TimeRatio needs an initial timestamp to measure intervals from
+ metrics.updateIdleTime(10);
+ time.sleep(40);
+ metrics.updateIdleTime(20);
+ // avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
+ assertEquals(0.5, avgIdleRatio.value(), delta);
+
+ time.sleep(20);
+ metrics.updateIdleTime(1);
+ // avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
+ assertEquals(0.05, avgIdleRatio.value(), delta);
+
+ } finally {
+ registry.shutdown();
+ }
+ }
+
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index 87c5b217d8e..a90928d35f3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -29,6 +29,7 @@ import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.metrics.TimeRatio;
import java.util.List;
import java.util.OptionalLong;
diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index 42ebdd37d20..ad2c916e3fc 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
@@ -278,22 +279,22 @@ public final class KafkaEventQueue implements EventQueue {
remove(toRun);
continue;
}
- if (awaitNs == Long.MAX_VALUE) {
- try {
+
+ long startIdleMs = time.milliseconds();
+ try {
+ if (awaitNs == Long.MAX_VALUE) {
cond.await();
- } catch (InterruptedException e) {
- log.warn("Interrupted while waiting for a new event. " +
- "Shutting down event queue");
- interrupted = true;
- }
- } else {
- try {
+ } else {
cond.awaitNanos(awaitNs);
- } catch (InterruptedException e) {
- log.warn("Interrupted while waiting for a deferred event. " +
- "Shutting down event queue");
- interrupted = true;
}
+ } catch (InterruptedException e) {
+ log.warn(
+ "Interrupted while waiting for a {} event. Shutting down event queue",
+ (awaitNs == Long.MAX_VALUE) ? "new" : "deferred"
+ );
+ interrupted = true;
+ } finally {
+ idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
}
} finally {
lock.unlock();
@@ -440,12 +441,18 @@ public final class KafkaEventQueue implements EventQueue {
*/
private boolean interrupted;
+ /**
+ * Optional callback for queue idle time tracking.
+ */
+ private final Consumer idleTimeCallback;
+
+
public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix
) {
- this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
+ this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> { });
}
public KafkaEventQueue(
@@ -453,6 +460,16 @@ public final class KafkaEventQueue implements EventQueue {
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent
+ ) {
+ this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
+ }
+
+ public KafkaEventQueue(
+ Time time,
+ LogContext logContext,
+ String threadNamePrefix,
+ Event cleanupEvent,
+ Consumer idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -463,6 +480,7 @@ public final class KafkaEventQueue implements EventQueue {
this.eventHandler, false);
this.shuttingDown = false;
this.interrupted = false;
+ this.idleTimeCallback = Objects.requireNonNull(idleTimeCallback);
this.eventHandlerThread.start();
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java b/server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
similarity index 80%
rename from raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
rename to server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
index 357682b6fe2..8a1572c0273 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
+++ b/server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.internals;
+package org.apache.kafka.server.metrics;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -46,11 +46,26 @@ public class TimeRatio implements MeasurableStat {
@Override
public double measure(MetricConfig config, long currentTimestampMs) {
+ return measure();
+ }
+
+ @Override
+ public void record(MetricConfig config, double value, long currentTimestampMs) {
+ record(value, currentTimestampMs);
+ }
+
+ /**
+ * Measures the ratio of recorded duration to the interval duration
+ * since the last measurement.
+ *
+ * @return The measured ratio value between 0.0 and 1.0
+ */
+ public double measure() {
if (lastRecordedTimestampMs < 0) {
// Return the default value if no recordings have been captured.
return defaultRatio;
} else {
- // We measure the ratio over the
+ // We measure the ratio over the interval
double intervalDurationMs = Math.max(lastRecordedTimestampMs - intervalStartTimestampMs, 0);
final double ratio;
if (intervalDurationMs == 0) {
@@ -61,15 +76,20 @@ public class TimeRatio implements MeasurableStat {
ratio = totalRecordedDurationMs / intervalDurationMs;
}
- // The next interval begins at the
+ // The next interval begins at the last recorded timestamp
intervalStartTimestampMs = lastRecordedTimestampMs;
totalRecordedDurationMs = 0;
return ratio;
}
}
- @Override
- public void record(MetricConfig config, double value, long currentTimestampMs) {
+ /**
+ * Records a duration value at the specified timestamp.
+ *
+ * @param value The duration value to record
+ * @param currentTimestampMs The current timestamp in milliseconds
+ */
+ public void record(double value, long currentTimestampMs) {
if (intervalStartTimestampMs < 0) {
// Discard the initial value since the value occurred prior to the interval start
intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +98,4 @@ public class TimeRatio implements MeasurableStat {
lastRecordedTimestampMs = currentTimestampMs;
}
}
-
}
diff --git a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 54fc65a604a..d2d4526eef7 100644
--- a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -424,4 +424,48 @@ public class KafkaEventQueueTest {
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
}
}
+
+ @Test
+ public void testIdleTimeCallback() throws Exception {
+ MockTime time = new MockTime();
+ AtomicLong lastIdleTimeMs = new AtomicLong(0);
+
+ try (KafkaEventQueue queue = new KafkaEventQueue(
+ time,
+ logContext,
+ "testIdleTimeCallback",
+ EventQueue.VoidEvent.INSTANCE,
+ lastIdleTimeMs::set)) {
+ time.sleep(2);
+ assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 0ms");
+
+ // Test 1: Two events with a wait in between using FutureEvent
+ CompletableFuture event1 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event1, () -> {
+ time.sleep(1);
+ return "event1-processed";
+ }));
+ assertEquals("event1-processed", event1.get());
+
+ long waitTime5Ms = 5;
+ time.sleep(waitTime5Ms);
+ CompletableFuture event2 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event2, () -> {
+ time.sleep(1);
+ return "event2-processed";
+ }));
+ assertEquals("event2-processed", event2.get());
+ assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());
+
+ // Test 2: Deferred event
+ long waitTime2Ms = 2;
+ CompletableFuture deferredEvent2 = new CompletableFuture<>();
+ queue.scheduleDeferred("deferred2",
+ __ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
+ () -> deferredEvent2.complete(null));
+ time.sleep(waitTime2Ms);
+ deferredEvent2.get();
+ assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
+ }
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java b/server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
similarity index 98%
rename from raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
rename to server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
index 94e8844734d..2c194a1448a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.internals;
+package org.apache.kafka.server.metrics;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;