KAFKA-18807; Fix thread idle ratio metric (#18934)

When group.coordinator.threads is greater than 1, we lose track of thread idle time because of integer arithmetic. Use doubles instead.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-02-18 07:11:38 +00:00 committed by GitHub
parent 2ecc16b987
commit 1c9190d6b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 13 additions and 13 deletions

View File

@ -71,7 +71,7 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
* Record the thread idle time.
* @param idleTimeMs The idle time in milliseconds.
*/
void recordThreadIdleTime(long idleTimeMs);
void recordThreadIdleTime(double idleTimeMs);
/**
* Register the event queue size gauge.

View File

@ -300,7 +300,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
}
@Override
public void recordThreadIdleTime(long idleTimeMs) {
public void recordThreadIdleTime(double idleTimeMs) {
threadIdleSensor.record(idleTimeMs);
}

View File

@ -138,7 +138,7 @@ public final class MultiThreadedEventProcessor implements CoordinatorEventProces
CoordinatorEvent event = accumulator.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
long idleEndTimeMs = time.milliseconds();
long idleTimeMs = idleEndTimeMs - idleStartTimeMs;
metrics.recordThreadIdleTime(idleTimeMs / threads.size());
metrics.recordThreadIdleTime((double) idleTimeMs / (double) threads.size());
if (event != null) {
try {
log.debug("Executing event: {}.", event);

View File

@ -137,7 +137,7 @@ public class CoordinatorRuntimeMetricsImplTest {
Metrics metrics = new Metrics(time);
CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleTime((i + 1) * 1000L));
IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleTime((i + 1) * 1000.0));
org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, "thread-idle-ratio-avg");
KafkaMetric metric = metrics.metrics().get(metricName);

View File

@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -451,7 +451,7 @@ public class MultiThreadedEventProcessorTest {
// Second event (e2)
// e1, e2 poll time
verify(mockRuntimeMetrics, times(2)).recordThreadIdleTime(500L);
verify(mockRuntimeMetrics, times(2)).recordThreadIdleTime(500.0);
// event queue time = e2 enqueue time + e2 poll time
verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(3500L);
}
@ -470,12 +470,12 @@ public class MultiThreadedEventProcessorTest {
mockRuntimeMetrics,
new DelayEventAccumulator(time, 100L)
)) {
List<Long> recordedIdleTimesMs = new ArrayList<>();
List<Double> recordedIdleTimesMs = new ArrayList<>();
AtomicInteger numEventsExecuted = new AtomicInteger(0);
ArgumentCaptor<Long> idleTimeCaptured = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Double> idleTimeCaptured = ArgumentCaptor.forClass(Double.class);
doAnswer(invocation -> {
long threadIdleTime = idleTimeCaptured.getValue();
assertEquals(100, threadIdleTime);
double threadIdleTime = idleTimeCaptured.getValue();
assertEquals(100.0, threadIdleTime);
// No synchronization required as the test uses a single event processor thread.
recordedIdleTimesMs.add(threadIdleTime);
@ -507,12 +507,12 @@ public class MultiThreadedEventProcessorTest {
});
assertEquals(events.size(), numEventsExecuted.get());
verify(mockRuntimeMetrics, times(8)).recordThreadIdleTime(anyLong());
verify(mockRuntimeMetrics, times(8)).recordThreadIdleTime(anyDouble());
assertEquals(8, recordedIdleTimesMs.size());
long diff = time.milliseconds() - startMs;
long sum = recordedIdleTimesMs.stream().mapToLong(Long::longValue).sum();
double idleRatio = (double) sum / diff;
double sum = recordedIdleTimesMs.stream().mapToDouble(Double::doubleValue).sum();
double idleRatio = sum / diff;
assertEquals(1.0, idleRatio, "idle ratio should be 1.0 but was: " + idleRatio);
}