diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java index 6c876931ab0..8b68dd608f8 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java @@ -68,9 +68,9 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable { void recordFlushTime(long durationMs); /** - * Record the flush time. + * Record the flush interval time. * - * @param durationMs The flush time in milliseconds. + * @param durationMs The flush interval time in milliseconds. */ void recordFlushIntervalTime(long durationMs); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index a2f25b24a4c..29d954f9bdc 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -4314,7 +4314,7 @@ public class CoordinatorRuntimeTest { } @Test - public void testRecordFlushTime() throws Exception { + public void testRecordFlushTimeAndFlushIntervalTime() throws Exception { MockTimer timer = new MockTimer(); // Writer sleeps for 10ms before appending records. @@ -4394,6 +4394,7 @@ public class CoordinatorRuntimeTest { assertEquals(List.of( records(firstBatchTimestamp, records.subList(0, 3)) ), writer.entries(TP)); + verify(runtimeMetrics, times(1)).recordFlushIntervalTime(0); verify(runtimeMetrics, times(1)).recordFlushTime(10); // Advance past the linger time. @@ -4412,6 +4413,10 @@ public class CoordinatorRuntimeTest { records(secondBatchTimestamp, records.subList(0, 3)), records(secondBatchTimestamp, records.subList(3, 4)) ), writer.entries(TP)); + + // If the previous batch was full, this batch was allocated prior to previous flush. + // flush interval time = previous flush time + append.linger.ms + verify(runtimeMetrics, times(1)).recordFlushIntervalTime(21); verify(runtimeMetrics, times(2)).recordFlushTime(10); // Commit and verify that writes are completed.