diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java index 0218fa13fa8..5693e3ea994 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java @@ -71,7 +71,7 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable { * Record the thread idle time. * @param idleTimeMs The idle time in milliseconds. */ - void recordThreadIdleTime(long idleTimeMs); + void recordThreadIdleTime(double idleTimeMs); /** * Register the event queue size gauge. diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java index 591b37e2fb4..a95f590c5b2 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java @@ -300,7 +300,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics } @Override - public void recordThreadIdleTime(long idleTimeMs) { + public void recordThreadIdleTime(double idleTimeMs) { threadIdleSensor.record(idleTimeMs); } diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessor.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessor.java index 14da4f040f3..e7f631c3852 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessor.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessor.java @@ -138,7 +138,7 @@ public final class MultiThreadedEventProcessor implements CoordinatorEventProces CoordinatorEvent event = accumulator.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); long idleEndTimeMs = time.milliseconds(); long idleTimeMs = idleEndTimeMs - idleStartTimeMs; - metrics.recordThreadIdleTime(idleTimeMs / threads.size()); + metrics.recordThreadIdleTime((double) idleTimeMs / (double) threads.size()); if (event != null) { try { log.debug("Executing event: {}.", event); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java index 7285b58ffab..ed6d2697634 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java @@ -137,7 +137,7 @@ public class CoordinatorRuntimeMetricsImplTest { Metrics metrics = new Metrics(time); CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); - IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleTime((i + 1) * 1000L)); + IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleTime((i + 1) * 1000.0)); org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, "thread-idle-ratio-avg"); KafkaMetric metric = metrics.metrics().get(metricName); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessorTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessorTest.java index a2003b67808..6018055c602 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessorTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MultiThreadedEventProcessorTest.java @@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -451,7 +451,7 @@ public class MultiThreadedEventProcessorTest { // Second event (e2) // e1, e2 poll time - verify(mockRuntimeMetrics, times(2)).recordThreadIdleTime(500L); + verify(mockRuntimeMetrics, times(2)).recordThreadIdleTime(500.0); // event queue time = e2 enqueue time + e2 poll time verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(3500L); } @@ -470,12 +470,12 @@ public class MultiThreadedEventProcessorTest { mockRuntimeMetrics, new DelayEventAccumulator(time, 100L) )) { - List recordedIdleTimesMs = new ArrayList<>(); + List recordedIdleTimesMs = new ArrayList<>(); AtomicInteger numEventsExecuted = new AtomicInteger(0); - ArgumentCaptor idleTimeCaptured = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor idleTimeCaptured = ArgumentCaptor.forClass(Double.class); doAnswer(invocation -> { - long threadIdleTime = idleTimeCaptured.getValue(); - assertEquals(100, threadIdleTime); + double threadIdleTime = idleTimeCaptured.getValue(); + assertEquals(100.0, threadIdleTime); // No synchronization required as the test uses a single event processor thread. recordedIdleTimesMs.add(threadIdleTime); @@ -507,12 +507,12 @@ public class MultiThreadedEventProcessorTest { }); assertEquals(events.size(), numEventsExecuted.get()); - verify(mockRuntimeMetrics, times(8)).recordThreadIdleTime(anyLong()); + verify(mockRuntimeMetrics, times(8)).recordThreadIdleTime(anyDouble()); assertEquals(8, recordedIdleTimesMs.size()); long diff = time.milliseconds() - startMs; - long sum = recordedIdleTimesMs.stream().mapToLong(Long::longValue).sum(); - double idleRatio = (double) sum / diff; + double sum = recordedIdleTimesMs.stream().mapToDouble(Double::doubleValue).sum(); + double idleRatio = sum / diff; assertEquals(1.0, idleRatio, "idle ratio should be 1.0 but was: " + idleRatio); }