mirror of https://github.com/apache/kafka.git
KAFKA-6486: Implemented LinkedHashMap in TimeWindows (#4628)
Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
4391a4214d
commit
02a8ef8595
|
|
@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
|
|||
import org.apache.kafka.streams.kstream.internals.TimeWindow;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
|
@ -105,7 +105,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
|
|||
@Override
|
||||
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
|
||||
long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs;
|
||||
final Map<Long, TimeWindow> windows = new HashMap<>();
|
||||
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
|
||||
while (windowStart <= timestamp) {
|
||||
final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs);
|
||||
windows.put(windowStart, window);
|
||||
|
|
|
|||
|
|
@ -16,8 +16,12 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
|
@ -117,4 +121,15 @@ public class TimeWindowTest {
|
|||
public void cannotCompareTimeWindowWithDifferentWindowType() {
|
||||
window.overlap(sessionWindow);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReturnMatchedWindowsOrderedByTimestamp() {
|
||||
final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
|
||||
final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
|
||||
|
||||
final Long[] expected = matched.keySet().toArray(new Long[matched.size()]);
|
||||
assertEquals(expected[0].longValue(), 10L);
|
||||
assertEquals(expected[1].longValue(), 15L);
|
||||
assertEquals(expected[2].longValue(), 20L);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue