mirror of https://github.com/apache/kafka.git
add flush interval test to runtime
This commit is contained in:
parent
4831bc829b
commit
947482b9df
|
@ -68,9 +68,9 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
|
||||||
void recordFlushTime(long durationMs);
|
void recordFlushTime(long durationMs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Record the flush time.
|
* Record the flush interval time.
|
||||||
*
|
*
|
||||||
* @param durationMs The flush time in milliseconds.
|
* @param durationMs The flush interval time in milliseconds.
|
||||||
*/
|
*/
|
||||||
void recordFlushIntervalTime(long durationMs);
|
void recordFlushIntervalTime(long durationMs);
|
||||||
|
|
||||||
|
|
|
@ -4314,7 +4314,7 @@ public class CoordinatorRuntimeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecordFlushTime() throws Exception {
|
public void testRecordFlushTimeAndFlushIntervalTime() throws Exception {
|
||||||
MockTimer timer = new MockTimer();
|
MockTimer timer = new MockTimer();
|
||||||
|
|
||||||
// Writer sleeps for 10ms before appending records.
|
// Writer sleeps for 10ms before appending records.
|
||||||
|
@ -4394,6 +4394,7 @@ public class CoordinatorRuntimeTest {
|
||||||
assertEquals(List.of(
|
assertEquals(List.of(
|
||||||
records(firstBatchTimestamp, records.subList(0, 3))
|
records(firstBatchTimestamp, records.subList(0, 3))
|
||||||
), writer.entries(TP));
|
), writer.entries(TP));
|
||||||
|
verify(runtimeMetrics, times(1)).recordFlushIntervalTime(0);
|
||||||
verify(runtimeMetrics, times(1)).recordFlushTime(10);
|
verify(runtimeMetrics, times(1)).recordFlushTime(10);
|
||||||
|
|
||||||
// Advance past the linger time.
|
// Advance past the linger time.
|
||||||
|
@ -4412,6 +4413,10 @@ public class CoordinatorRuntimeTest {
|
||||||
records(secondBatchTimestamp, records.subList(0, 3)),
|
records(secondBatchTimestamp, records.subList(0, 3)),
|
||||||
records(secondBatchTimestamp, records.subList(3, 4))
|
records(secondBatchTimestamp, records.subList(3, 4))
|
||||||
), writer.entries(TP));
|
), writer.entries(TP));
|
||||||
|
|
||||||
|
// If the previous batch was full, this batch was allocated prior to previous flush.
|
||||||
|
// flush interval time = previous flush time + append.linger.ms
|
||||||
|
verify(runtimeMetrics, times(1)).recordFlushIntervalTime(21);
|
||||||
verify(runtimeMetrics, times(2)).recordFlushTime(10);
|
verify(runtimeMetrics, times(2)).recordFlushTime(10);
|
||||||
|
|
||||||
// Commit and verify that writes are completed.
|
// Commit and verify that writes are completed.
|
||||||
|
|
Loading…
Reference in New Issue