Adding reverse iterator usage for sliding windows processing (extending KIP-450) (#9239)

Add a backwardFetch call to the window store for sliding window
processing. While the implementation works with the forward call
to the window store, using backwardFetch allows for the iterator
to be closed earlier, making implementation more efficient.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>
This commit is contained in:
leah 2020-09-11 16:38:17 -05:00 committed by GitHub
parent 8260d7cdfb
commit 2194ccba5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 606 additions and 319 deletions

View File

@ -203,6 +203,9 @@
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)" <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/> files="Murmur3Test.java"/>
<suppress checks="MethodLength"
files="KStreamSlidingWindowAggregateTest.java"/>
<!-- Streams test-utils --> <!-- Streams test-utils -->
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="TopologyTestDriver.java"/> files="TopologyTestDriver.java"/>

View File

@ -83,6 +83,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
private Sensor lateRecordDropSensor; private Sensor lateRecordDropSensor;
private Sensor droppedRecordsSensor; private Sensor droppedRecordsSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private Boolean reverseIteratorPossible = null;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
@ -150,8 +151,23 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
return; return;
} }
if (reverseIteratorPossible == null) {
try {
windowStore.backwardFetch(key, 0L, 0L);
reverseIteratorPossible = true;
log.debug("Sliding Windows aggregate using a reverse iterator");
} catch (final UnsupportedOperationException e) {
reverseIteratorPossible = false;
log.debug("Sliding Windows aggregate using a forward iterator");
}
}
if (reverseIteratorPossible) {
processReverse(key, value, inputRecordTimestamp, closeTime);
} else {
processInOrder(key, value, inputRecordTimestamp, closeTime); processInOrder(key, value, inputRecordTimestamp, closeTime);
} }
}
public void processInOrder(final K key, final V value, final long inputRecordTimestamp, final long closeTime) { public void processInOrder(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
@ -172,7 +188,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
key, key,
key, key,
Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
// to catch the current record's right window, if it exists, without more calls to the store // add 1 to upper bound to catch the current record's right window, if it exists, without more calls to the store
inputRecordTimestamp + 1) inputRecordTimestamp + 1)
) { ) {
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -205,33 +221,67 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
} }
} }
} }
createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
//create right window for previous record
if (previousRecordTimestamp != null) {
final long previousRightWinStart = previousRecordTimestamp + 1;
if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
}
} }
//create left window for new record public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
if (!leftWinAlreadyCreated) {
final ValueAndTimestamp<Agg> valueAndTime; final Set<Long> windowStartTimes = new HashSet<>();
// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) { // aggregate that will go in the current records left/right window (if needed)
valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp); ValueAndTimestamp<Agg> leftWinAgg = null;
ValueAndTimestamp<Agg> rightWinAgg = null;
//if current record's left/right windows already exist
boolean leftWinAlreadyCreated = false;
boolean rightWinAlreadyCreated = false;
Long previousRecordTimestamp = null;
try (
final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
key,
key,
Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
// add 1 to upper bound to catch the current record's right window, if it exists, without more calls to the store
inputRecordTimestamp + 1)
) {
while (iterator.hasNext()) {
final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
final long startTime = windowBeingProcessed.key.window().start();
windowStartTimes.add(startTime);
final long endTime = startTime + windows.timeDifferenceMs();
final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
if (startTime == inputRecordTimestamp + 1) {
rightWinAlreadyCreated = true;
} else if (endTime > inputRecordTimestamp) {
if (rightWinAgg == null) {
rightWinAgg = windowBeingProcessed.value;
}
updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
} else if (endTime == inputRecordTimestamp) {
leftWinAlreadyCreated = true;
updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
if (windowMaxRecordTimestamp < inputRecordTimestamp) {
previousRecordTimestamp = windowMaxRecordTimestamp;
} else { } else {
valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); return;
} }
final TimeWindow window = new TimeWindow(inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp); } else if (endTime < inputRecordTimestamp) {
updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp); leftWinAgg = windowBeingProcessed.value;
previousRecordTimestamp = windowMaxRecordTimestamp;
break;
} else {
log.error(
"Unexpected window with start {} found when processing record at {} in `KStreamSlidingWindowAggregate`.",
startTime, inputRecordTimestamp
);
throw new IllegalStateException("Unexpected window found when processing sliding windows");
} }
if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, inputRecordTimestamp)) {
createCurrentRecordRightWindow(inputRecordTimestamp, rightWinAgg, key);
} }
} }
createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
}
/** /**
* Created to handle records where 0 < inputRecordTimestamp < timeDifferenceMs. These records would create * Created to handle records where 0 < inputRecordTimestamp < timeDifferenceMs. These records would create
@ -260,7 +310,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
key, key,
key, key,
0, 0,
// to catch the current record's right window, if it exists, without more calls to the store // add 1 to upper bound to catch the current record's right window, if it exists, without more calls to the store
inputRecordTimestamp + 1) inputRecordTimestamp + 1)
) { ) {
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -310,9 +360,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
//create the right window for the previous record if the previous record exists and the window hasn't already been created //create the right window for the previous record if the previous record exists and the window hasn't already been created
if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs()); createPreviousRecordRightWindow(previousRecordTimestamp + 1, inputRecordTimestamp, key, value, closeTime);
final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
} }
if (combinedWindow == null) { if (combinedWindow == null) {
@ -327,6 +375,42 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
} }
private void createWindows(final K key,
final V value,
final long inputRecordTimestamp,
final long closeTime,
final Set<Long> windowStartTimes,
final ValueAndTimestamp<Agg> rightWinAgg,
final ValueAndTimestamp<Agg> leftWinAgg,
final boolean leftWinAlreadyCreated,
final boolean rightWinAlreadyCreated,
final Long previousRecordTimestamp) {
//create right window for previous record
if (previousRecordTimestamp != null) {
final long previousRightWinStart = previousRecordTimestamp + 1;
if (previousRecordRightWindowDoesNotExistAndIsNotEmpty(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
createPreviousRecordRightWindow(previousRightWinStart, inputRecordTimestamp, key, value, closeTime);
}
}
//create left window for new record
if (!leftWinAlreadyCreated) {
final ValueAndTimestamp<Agg> valueAndTime;
if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
} else {
valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
}
final TimeWindow window = new TimeWindow(inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp);
updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
}
// create right window for new record, if necessary
if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, inputRecordTimestamp)) {
createCurrentRecordRightWindow(inputRecordTimestamp, rightWinAgg, key);
}
}
private void createCurrentRecordRightWindow(final long inputRecordTimestamp, private void createCurrentRecordRightWindow(final long inputRecordTimestamp,
final ValueAndTimestamp<Agg> rightWinAgg, final ValueAndTimestamp<Agg> rightWinAgg,
final K key) { final K key) {
@ -342,13 +426,23 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
rightWinAgg.timestamp()); rightWinAgg.timestamp());
} }
private void createPreviousRecordRightWindow(final long windowStart,
final long inputRecordTimestamp,
final K key,
final V value,
final long closeTime) {
final TimeWindow window = new TimeWindow(windowStart, windowStart + windows.timeDifferenceMs());
final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
}
// checks if the previous record falls into the current records left window; if yes, the left window is not empty, otherwise it is empty // checks if the previous record falls into the current records left window; if yes, the left window is not empty, otherwise it is empty
private boolean leftWindowNotEmpty(final Long previousRecordTimestamp, final long inputRecordTimestamp) { private boolean leftWindowNotEmpty(final Long previousRecordTimestamp, final long inputRecordTimestamp) {
return previousRecordTimestamp != null && inputRecordTimestamp - windows.timeDifferenceMs() <= previousRecordTimestamp; return previousRecordTimestamp != null && inputRecordTimestamp - windows.timeDifferenceMs() <= previousRecordTimestamp;
} }
// checks if the previous record's right window does not already exist and the current record falls within previous record's right window // checks if the previous record's right window does not already exist and the current record falls within previous record's right window
private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes, private boolean previousRecordRightWindowDoesNotExistAndIsNotEmpty(final Set<Long> windowStartTimes,
final long previousRightWindowStart, final long previousRightWindowStart,
final long inputRecordTimestamp) { final long inputRecordTimestamp) {
return !windowStartTimes.contains(previousRightWindowStart) && previousRightWindowStart + windows.timeDifferenceMs() >= inputRecordTimestamp; return !windowStartTimes.contains(previousRightWindowStart) && previousRightWindowStart + windows.timeDifferenceMs() >= inputRecordTimestamp;

View File

@ -68,7 +68,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private volatile boolean open = false; private volatile boolean open = false;
InMemoryWindowStore(final String name, public InMemoryWindowStore(final String name,
final long retentionPeriod, final long retentionPeriod,
final long windowSize, final long windowSize,
final boolean retainDuplicates, final boolean retainDuplicates,

View File

@ -50,7 +50,9 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -239,52 +241,81 @@ public class KGroupedStreamImplTest {
inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("2", "B", 1000L);
inputTopic.pipeInput("3", "C", 600L); inputTopic.pipeInput("3", "C", 600L);
} }
assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =
Comparator.comparing((KeyValueTimestamp<Windowed<String>, Long> o) -> o.key().key())
.thenComparing((KeyValueTimestamp<Windowed<String>, Long> o) -> o.key().window().start());
final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> actual = supplier.theCapturedProcessor().processed();
actual.sort(comparator);
assertThat(actual, equalTo(Arrays.asList(
// processing A@500 // processing A@500
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
// processing A@600
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
// processing A@999 // processing A@999
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
// processing A@600 // processing A@600
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
// processing B@500
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
// processing B@600
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
// processing B@700
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
// processing C@501
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
// processing first A@1000 // processing first A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
// processing second A@1000 // processing second A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
// processing A@999
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
// processing A@600
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
// processing first A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
// processing second A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
// processing A@600
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
// processing first A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
// processing second A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
// processing first A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
// processing second A@1000
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
// processing B@500
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
// processing B@600
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
// processing B@700
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
// processing first B@1000 // processing first B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
// processing second B@1000 // processing second B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
// processing B@600
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
// processing B@700
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
// processing first B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
// processing second B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
// processing B@700
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
// processing first B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
// processing second B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
// processing first B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
// processing second B@1000
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
// processing C@600
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
// processing C@501
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
// processing C@600
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L),
// processing C@600
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L)
))); )));
} }

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
@ -33,11 +32,18 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockInitializer;
@ -47,9 +53,15 @@ import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -69,7 +81,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class KStreamSlidingWindowAggregateTest { public class KStreamSlidingWindowAggregateTest {
@Parameterized.Parameters(name = "{0}")
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] {
{false},
{true}
});
}
@Parameterized.Parameter
public boolean inOrderIterator;
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName(); private final String threadId = Thread.currentThread().getName();
@ -79,6 +104,10 @@ public class KStreamSlidingWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic"; final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KTable<Windowed<String>, String> table = builder final KTable<Windowed<String>, String> table = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@ -86,7 +115,7 @@ public class KStreamSlidingWindowAggregateTest {
.aggregate( .aggregate(
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table.toStream().process(supplier); table.toStream().process(supplier);
@ -131,6 +160,10 @@ public class KStreamSlidingWindowAggregateTest {
public void testReduceSmallInput() { public void testReduceSmallInput() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic"; final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KTable<Windowed<String>, String> table = builder final KTable<Windowed<String>, String> table = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
@ -138,7 +171,7 @@ public class KStreamSlidingWindowAggregateTest {
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
.reduce( .reduce(
MockReducer.STRING_ADDER, MockReducer.STRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table.toStream().process(supplier); table.toStream().process(supplier);
@ -184,6 +217,10 @@ public class KStreamSlidingWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1"; final String topic1 = "topic1";
final WindowBytesStoreSupplier storeSupplier =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KTable<Windowed<String>, String> table2 = builder final KTable<Windowed<String>, String> table2 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@ -191,8 +228,9 @@ public class KStreamSlidingWindowAggregateTest {
.aggregate( .aggregate(
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table2.toStream().process(supplier); table2.toStream().process(supplier);
@ -222,112 +260,113 @@ public class KStreamSlidingWindowAggregateTest {
inputTopic1.pipeInput("D", "3", 29L); inputTopic1.pipeInput("D", "3", 29L);
inputTopic1.pipeInput("D", "5", 16L); inputTopic1.pipeInput("D", "5", 16L);
} }
final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
.thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
actual.sort(comparator);
assertEquals( assertEquals(
asList( asList(
// FINAL WINDOW: A@10 left window created when A@10 processed // FINAL WINDOW: A@10 left window created when A@10 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10),
// A@10 right window created when A@20 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20),
// A@20 left window created when A@20 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20),
// FINAL WINDOW: A@20 right window created when A@22 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22),
// A@22 left window created when A@22 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22),
// FINAL WINDOW: A@20 left window updated when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20),
// FINAL WINDOW: A@10 right window updated when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20),
// FINAL WINDOW: A@22 left window updated when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22),
// FINAL WINDOW: A@15 left window created when A@15 processed // FINAL WINDOW: A@15 left window created when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15),
// A@20 left window created when A@20 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20),
// FINAL WINDOW: A@20 left window updated when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20),
// A@10 right window created when A@20 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20),
// FINAL WINDOW: A@10 right window updated when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20),
// A@22 left window created when A@22 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22),
// FINAL WINDOW: A@22 left window updated when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22),
// FINAL WINDOW: A@15 right window created when A@15 processed // FINAL WINDOW: A@15 right window created when A@15 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22),
// FINAL WINDOW: A@20 right window created when A@22 processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22),
// FINAL WINDOW: B@12 left window created when B@12 processed // FINAL WINDOW: B@12 left window created when B@12 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12),
// B@12 right window created when B@13 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13),
// FINAL WINDOW: B@13 left window created when B@13 processed // FINAL WINDOW: B@13 left window created when B@13 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13),
// B@12 right window updated when B@18 processed // FINAL WINDOW: B@14 left window created when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14),
// B@13 right window created when B@18 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18),
// B@18 left window created when B@18 processed // B@18 left window created when B@18 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18),
// B@12 right window updated when B@19 processed // FINAL WINDOW: B@18 left window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18),
// B@13 right window updated when B@19 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19),
// B@18 right window created when B@19 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19),
// B@19 left window created when B@19 processed // B@19 left window created when B@19 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19),
// FINAL WINDOW: B@19 left window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
// B@12 right window created when B@13 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13),
// B@12 right window updated when B@18 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18),
// B@12 right window updated when B@19 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19),
// FINAL WINDOW: B@12 right window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19),
// B@13 right window created when B@18 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18),
// B@13 right window updated when B@19 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19),
// FINAL WINDOW: B@13 right window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19),
// FINAL WINDOW: B@25 left window created when B@25 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25),
// B@18 right window created when B@19 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19),
// FINAL WINDOW: B@18 right window updated when B@25 processed // FINAL WINDOW: B@18 right window updated when B@25 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25),
// FINAL WINDOW: B@19 right window updated when B@25 processed // FINAL WINDOW: B@19 right window updated when B@25 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25),
// FINAL WINDOW: B@25 left window created when B@25 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25),
// FINAL WINDOW: B@18 left window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18),
// FINAL WINDOW: B@19 left window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
// FINAL WINDOW: B@12 right window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19),
// FINAL WINDOW: B@13 right window updated when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19),
// FINAL WINDOW: B@14 left window created when B@14 processed
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14),
// FINAL WINDOW: C@11 left window created when C@11 processed // FINAL WINDOW: C@11 left window created when C@11 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11),
// C@11 right window created when C@15 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15),
// FINAL WINDOW: C@15 left window created when C@15 processed // FINAL WINDOW: C@15 left window created when C@15 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15),
// C@11 right window updated when C@16 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16),
// C@15 right window created when C@16 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16),
// FINAL WINDOW: C@16 left window created when C@16 processed // FINAL WINDOW: C@16 left window created when C@16 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16),
// FINAL WINDOW: C@11 right window updated when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21),
// C@15 right window updated when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21),
// C@16 right window created when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21),
// FINAL WINDOW: C@21 left window created when C@21 processed // FINAL WINDOW: C@21 left window created when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21),
// C@11 right window created when C@15 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15),
// C@11 right window updated when C@16 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16),
// FINAL WINDOW: C@11 right window updated when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21),
// FINAL WINDOW: C@23 left window created when C@23 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23),
// C@15 right window created when C@16 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16),
// C@15 right window updated when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21),
// FINAL WINDOW: C@15 right window updated when C@23 processed // FINAL WINDOW: C@15 right window updated when C@23 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23),
// C@16 right window created when C@21 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21),
// FINAL WINDOW: C@16 right window updated when C@23 processed // FINAL WINDOW: C@16 right window updated when C@23 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23),
// FINAL WINDOW: C@21 right window created when C@23 processed // FINAL WINDOW: C@21 right window created when C@23 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23),
// FINAL WINDOW: C@23 left window created when C@23 processed
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23),
// FINAL WINDOW: D@11 left window created when D@11 processed // FINAL WINDOW: D@11 left window created when D@11 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11),
// D@11 right window created when D@12 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12),
// FINAL WINDOW: D@12 left window created when D@12 processed // FINAL WINDOW: D@12 left window created when D@12 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12),
// FINAL WINDOW: D@29 left window created when D@29 processed // FINAL WINDOW: D@16 left window created when D@16 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16),
// D@11 right window created when D@12 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12),
// FINAL WINDOW: D@11 right window updated when D@16 processed // FINAL WINDOW: D@11 right window updated when D@16 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16),
// FINAL WINDOW: D@12 right window created when D@16 processed // FINAL WINDOW: D@12 right window created when D@16 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16),
// FINAL WINDOW: D@16 left window created when D@16 processed // FINAL WINDOW: D@29 left window created when D@29 processed
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16) new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29)),
), actual
supplier.theCapturedProcessor().processed()
); );
} }
@ -336,6 +375,14 @@ public class KStreamSlidingWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1"; final String topic1 = "topic1";
final String topic2 = "topic2"; final String topic2 = "topic2";
final WindowBytesStoreSupplier storeSupplier1 =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder1", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse1", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final WindowBytesStoreSupplier storeSupplier2 =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder2", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse2", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KTable<Windowed<String>, String> table1 = builder final KTable<Windowed<String>, String> table1 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
@ -344,12 +391,8 @@ public class KStreamSlidingWindowAggregateTest {
.aggregate( .aggregate(
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier1)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table1.toStream().process(supplier);
final KTable<Windowed<String>, String> table2 = builder final KTable<Windowed<String>, String> table2 = builder
.stream(topic2, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@ -357,8 +400,12 @@ public class KStreamSlidingWindowAggregateTest {
.aggregate( .aggregate(
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier2)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table1.toStream().process(supplier);
table2.toStream().process(supplier); table2.toStream().process(supplier);
table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier); table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier);
@ -545,6 +592,10 @@ public class KStreamSlidingWindowAggregateTest {
public void testEarlyRecordsLargeInput() { public void testEarlyRecordsLargeInput() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic"; final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KTable<Windowed<String>, String> table2 = builder final KTable<Windowed<String>, String> table2 = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
@ -553,8 +604,9 @@ public class KStreamSlidingWindowAggregateTest {
.aggregate( .aggregate(
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table2.toStream().process(supplier); table2.toStream().process(supplier);
@ -572,52 +624,79 @@ public class KStreamSlidingWindowAggregateTest {
inputTopic1.pipeInput("E", "8", 2L); inputTopic1.pipeInput("E", "8", 2L);
inputTopic1.pipeInput("E", "9", 15L); inputTopic1.pipeInput("E", "9", 15L);
} }
final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
.thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
actual.sort(comparator);
assertEquals( assertEquals(
asList( asList(
// E@0 // E@0
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0),
// E@5 // E@5
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5),
// E@6 // E@6
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6),
// E@3 // E@3
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6),
//E@13
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
//E@10 //E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5", 10), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5", 10),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5", 10),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
//E@4 //E@4
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7", 10),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7", 10), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7", 10),
//E@2 //E@2
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7+8", 10),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7+8", 10), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7+8", 10),
// E@5
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
// E@6
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
// E@3
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
//E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5", 10),
//E@4
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7", 10),
//E@2
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7+8", 10),
//E@13
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
//E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
//E@4
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13),
// E@3
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
//E@13
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
//E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
//E@4
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13),
//E@4
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13),
//E@15 //E@15
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15),
// E@6
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
//E@13
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
//E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
//E@15
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15),
//E@13
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
//E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
//E@15
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15),
//E@10
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
//E@15
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15) //E@15
), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)),
supplier.theCapturedProcessor().processed() actual
); );
} }
@ -648,17 +727,20 @@ public class KStreamSlidingWindowAggregateTest {
final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST; final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic"; final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90L))) .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90)))
.aggregate( .aggregate(
() -> "", MockInitializer.STRING_INIT,
MockAggregator.toStringInstance("+"), MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() Materialized.as(storeSupplier)
) )
.toStream() .toStream()
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to("output"); .to("output");
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
@ -695,9 +777,9 @@ public class KStreamSlidingWindowAggregateTest {
// left window for k@15 // left window for k@15
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]" "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]"
)); ));
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<Windowed<String>, String> outputTopic =
driver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); driver.createOutputTopic("output", new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), new StringDeserializer());
assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>("[k@190/200]", "+100", null, 200L))); assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>(new Windowed<>("k", new TimeWindow(190, 200)), "0+100", null, 200L)));
assertTrue(outputTopic.isEmpty()); assertTrue(outputTopic.isEmpty());
} }
} }
@ -708,6 +790,10 @@ public class KStreamSlidingWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1"; final String topic1 = "topic1";
final WindowBytesStoreSupplier storeSupplier =
inOrderIterator
? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
: Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
final KTable<Windowed<String>, String> table = builder final KTable<Windowed<String>, String> table = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
@ -724,7 +810,7 @@ public class KStreamSlidingWindowAggregateTest {
aggregate = String.valueOf(ch); aggregate = String.valueOf(ch);
return aggregate; return aggregate;
}, },
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) Materialized.as(storeSupplier)
); );
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table.toStream().process(supplier); table.toStream().process(supplier);
@ -877,4 +963,56 @@ public class KStreamSlidingWindowAggregateTest {
assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness); assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness); assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
} }
private static class InOrderMemoryWindowStore extends InMemoryWindowStore {
InOrderMemoryWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final String metricScope) {
super(name, retentionPeriod, windowSize, retainDuplicates, metricScope);
}
@Override
public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final long timeFrom, final long timeTo) {
throw new UnsupportedOperationException("Backward fetch not supported here");
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom,
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
throw new UnsupportedOperationException("Backward fetch not supported here");
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) {
throw new UnsupportedOperationException("Backward fetch not supported here");
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
throw new UnsupportedOperationException("Backward fetch not supported here");
}
}
private static class InOrderMemoryWindowStoreSupplier extends InMemoryWindowBytesStoreSupplier {
InOrderMemoryWindowStoreSupplier(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates) {
super(name, retentionPeriod, windowSize, retainDuplicates);
}
@Override
public WindowStore<Bytes, byte[]> get() {
return new InOrderMemoryWindowStore(name(),
retentionPeriod(),
windowSize(),
retainDuplicates(),
metricsScope());
}
}
} }

View File

@ -20,9 +20,12 @@ package org.apache.kafka.streams.kstream.internals;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
@ -145,7 +148,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
} }
@Test @Test
public void slidingWindowAggregateTestStreamsTest() { public void slidingWindowAggregateStreamsTest() {
final KTable<Windowed<String>, String> customers = windowedCogroupedStream.aggregate( final KTable<Windowed<String>, String> customers = windowedCogroupedStream.aggregate(
MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String())); MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT); customers.toStream().to(OUTPUT);
@ -165,26 +168,36 @@ public class SlidingWindowedCogroupedKStreamImplTest {
testInputTopic.pipeInput("k2", "B", 504); testInputTopic.pipeInput("k2", "B", 504);
testInputTopic.pipeInput("k1", "B", 504); testInputTopic.pipeInput("k1", "B", 504);
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 500); final Set<TestRecord<String, String>> results = new HashSet<>();
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 500); while (!testOutputTopic.isEmpty()) {
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 501); final TestRecord<Windowed<String>, String> realRecord = testOutputTopic.readRecord();
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A", 501); final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 502); realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A", 502); results.add(nonWindowedRecord);
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B", 503); }
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 503); final Set<TestRecord<String, String>> expected = new HashSet<>();
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B", 503); expected.add(new TestRecord<>("k1", "0+A", null, 500L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B", 503); expected.add(new TestRecord<>("k2", "0+A", null, 500L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 503); expected.add(new TestRecord<>("k2", "0+A", null, 501L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B", 503); expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B+B", 504); expected.add(new TestRecord<>("k1", "0+A", null, 502L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B+B", 504); expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 504); expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B+B", 504); expected.add(new TestRecord<>("k1", "0+B", null, 503L));
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B+B", 504); expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B+B", 504); expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 504); expected.add(new TestRecord<>("k2", "0+B", null, 503L));
assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B+B", 504); expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
expected.add(new TestRecord<>("k2", "0+B", null, 504L));
expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
assertEquals(expected, results);
} }
} }

View File

@ -47,6 +47,7 @@ import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.Test; import org.junit.Test;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -493,39 +494,46 @@ public class SuppressScenarioTest {
inputTopic.pipeInput("k1", "v1", 7L); inputTopic.pipeInput("k1", "v1", 7L);
// final record to advance stream time and flush windows // final record to advance stream time and flush windows
inputTopic.pipeInput("k1", "v1", 90L); inputTopic.pipeInput("k1", "v1", 90L);
final Comparator<TestRecord<String, Long>> comparator =
Comparator.comparing((TestRecord<String, Long> o) -> o.getKey())
.thenComparing((TestRecord<String, Long> o) -> o.timestamp());
final List<TestRecord<String, Long>> actual = drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER);
actual.sort(comparator);
verify( verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), actual,
asList( asList(
// left window for k1@10 created when k1@10 is processed
new KeyValueTimestamp<>("[k1@5/10]", 1L, 10L),
// right window for k1@10 created when k1@11 is processed // right window for k1@10 created when k1@11 is processed
new KeyValueTimestamp<>("[k1@11/16]", 1L, 11L), new KeyValueTimestamp<>("[k1@11/16]", 1L, 11L),
// left window for k1@11 created when k1@11 is processed
new KeyValueTimestamp<>("[k1@6/11]", 2L, 11L),
// left window for k1@10 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@5/10]", 2L, 10L),
// left window for k1@11 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@6/11]", 3L, 11L),
// right window for k1@10 updated when k1@13 is processed // right window for k1@10 updated when k1@13 is processed
new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L), new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
// right window for k1@11 created when k1@13 is processed // right window for k1@11 created when k1@13 is processed
new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L), new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
// left window for k1@13 created when k1@13 is processed
new KeyValueTimestamp<>("[k1@8/13]", 4L, 13L),
// left window for k1@10 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@5/10]", 3L, 10L),
// left window for k1@11 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@6/11]", 4L, 11L),
// left window for k1@13 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
// left window for k1@24 created when k1@24 is processed // left window for k1@24 created when k1@24 is processed
new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L), new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L),
// left window for k1@10 created when k1@10 is processed
new KeyValueTimestamp<>("[k1@5/10]", 1L, 10L),
// left window for k1@10 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@5/10]", 2L, 10L),
// left window for k1@10 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@5/10]", 3L, 10L),
// left window for k1@10 updated when k1@5 is processed // left window for k1@10 updated when k1@5 is processed
new KeyValueTimestamp<>("[k1@5/10]", 4L, 10L), new KeyValueTimestamp<>("[k1@5/10]", 4L, 10L),
// left window for k1@10 updated when k1@7 is processed // left window for k1@10 updated when k1@7 is processed
new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L), new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
// left window for k1@11 created when k1@11 is processed
new KeyValueTimestamp<>("[k1@6/11]", 2L, 11L),
// left window for k1@11 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@6/11]", 3L, 11L),
// left window for k1@11 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@6/11]", 4L, 11L),
// left window for k1@11 updated when k1@7 is processed // left window for k1@11 updated when k1@7 is processed
new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L), new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
// left window for k1@13 created when k1@13 is processed
new KeyValueTimestamp<>("[k1@8/13]", 4L, 13L),
// left window for k1@13 updated when k1@10 is processed
new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
// right window for k1@90 created when k1@90 is processed
new KeyValueTimestamp<>("[k1@85/90]", 1L, 90L) new KeyValueTimestamp<>("[k1@85/90]", 1L, 90L)
) )
); );