KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size (#9253)

See KIP details and discussions here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size

Deprecates methods that allow users to skip setting a window size when one is needed. Adds a window size streams config to allow the timeWindowedDeserializer to calculate window end time.

Reviewers: Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
leah 2021-02-01 18:20:35 -06:00 committed by GitHub
parent 62218a05d3
commit f5a2fbac6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 123 deletions

View File

@ -348,6 +348,11 @@
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td> <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds (1 day)</td> <td>86400000 milliseconds (1 day)</td>
</tr> </tr>
<tr class="row-even"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td>null</td>
</tr>
</tbody> </tbody>
</table> </table>
<div class="section" id="acceptable-recovery-lag"> <div class="section" id="acceptable-recovery-lag">

View File

@ -114,6 +114,13 @@
<code>StreamsUncaughtExceptionHandler</code> return <code>REPLACE_THREAD</code>. <code>StreamsUncaughtExceptionHandler</code> return <code>REPLACE_THREAD</code>.
When all stream threads are dead there is no automatic transition to ERROR as a new stream thread can be added. When all stream threads are dead there is no automatic transition to ERROR as a new stream thread can be added.
</p> </p>
<p>
The <code>TimeWindowedDeserializer</code> constructor <code>TimeWindowedDeserializer(final Deserializer inner)</code>
was deprecated to encourage users to properly set their window size through <code>TimeWindowedDeserializer(final Deserializer inner, Long windowSize)</code>.
An additional streams config, <code>window.size.ms</code>, was added for users that cannot set the window size through
the constructor, such as when using the console consumer. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size">KIP-659</a>
has more details.
</p>
<h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3> <h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
<p> <p>

View File

@ -99,7 +99,7 @@ public class TemperatureDemo {
.toStream() .toStream()
.filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD); .filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, TEMPERATURE_WINDOW_SIZE);
// need to override key serde to Windowed<String> type // need to override key serde to Windowed<String> type
max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String())); max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String()));

View File

@ -541,6 +541,10 @@ public class StreamsConfig extends AbstractConfig {
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization"; public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default"; private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
/** {@code window.size.ms} */
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";
/** {@code upgrade.from} */ /** {@code upgrade.from} */
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
@ -857,7 +861,12 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG, Type.LONG,
24 * 60 * 60 * 1000L, 24 * 60 * 60 * 1000L,
Importance.LOW, Importance.LOW,
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC); WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
.define(WINDOW_SIZE_MS_CONFIG,
Type.LONG,
null,
Importance.LOW,
WINDOW_SIZE_MS_DOC);
} }
// this is the list of configs for underlying clients // this is the list of configs for underlying clients

View File

@ -33,22 +33,22 @@ import java.util.Map;
*/ */
public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> { public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
private final Long windowSize; private Long windowSize;
private boolean isChangelogTopic; private boolean isChangelogTopic;
private Deserializer<T> inner; private Deserializer<T> inner;
// Default constructor needed by Kafka // Default constructor needed by Kafka
public TimeWindowedDeserializer() { public TimeWindowedDeserializer() {
this(null, Long.MAX_VALUE); this(null, null);
} }
// TODO: fix this part as last bits of KAFKA-4468 @Deprecated
public TimeWindowedDeserializer(final Deserializer<T> inner) { public TimeWindowedDeserializer(final Deserializer<T> inner) {
this(inner, Long.MAX_VALUE); this(inner, Long.MAX_VALUE);
} }
public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize) { public TimeWindowedDeserializer(final Deserializer<T> inner, final Long windowSize) {
this.inner = inner; this.inner = inner;
this.windowSize = windowSize; this.windowSize = windowSize;
this.isChangelogTopic = false; this.isChangelogTopic = false;
@ -61,6 +61,21 @@ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void configure(final Map<String, ?> configs, final boolean isKey) { public void configure(final Map<String, ?> configs, final boolean isKey) {
//check to see if the window size config is set and the window size is already set from the constructor
final Long configWindowSize;
if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof String) {
configWindowSize = Long.parseLong((String) configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG));
} else {
configWindowSize = (Long) configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG);
}
if (windowSize != null && configWindowSize != null) {
throw new IllegalArgumentException("Window size should not be set in both the time windowed deserializer constructor and the window.size.ms config");
} else if (windowSize == null && configWindowSize == null) {
throw new IllegalArgumentException("Window size needs to be set either through the time windowed deserializer " +
"constructor or the window.size.ms config but not both");
} else {
windowSize = windowSize == null ? configWindowSize : windowSize;
}
if (inner == null) { if (inner == null) {
final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS; final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
final String value = (String) configs.get(propertyName); final String value = (String) configs.get(propertyName);

View File

@ -29,6 +29,7 @@ public class WindowedSerdes {
super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>()); super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>());
} }
@Deprecated
public TimeWindowedSerde(final Serde<T> inner) { public TimeWindowedSerde(final Serde<T> inner) {
super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer())); super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
} }
@ -60,6 +61,7 @@ public class WindowedSerdes {
/** /**
* Construct a {@code TimeWindowedSerde} object for the specified inner class type. * Construct a {@code TimeWindowedSerde} object for the specified inner class type.
*/ */
@Deprecated
static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type) { static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T> type) {
return new TimeWindowedSerde<>(Serdes.serdeFrom(type)); return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
} }

View File

@ -207,7 +207,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp); produceMessages(secondBatchTimestamp);
produceMessages(secondBatchTimestamp); produceMessages(secondBatchTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
groupedStream groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L))) .windowedBy(TimeWindows.of(ofMillis(500L)))
.reduce(reducer) .reduce(reducer)
@ -235,25 +235,27 @@ public class KStreamAggregationIntegrationTest {
.thenComparing(KeyValueTimestamp::value); .thenComparing(KeyValueTimestamp::value);
windowedOutput.sort(comparator); windowedOutput.sort(comparator);
final long firstBatchWindow = firstBatchTimestamp / 500 * 500; final long firstBatchWindowStart = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500; final long firstBatchWindowEnd = firstBatchWindowStart + 500;
final long secondBatchWindowStart = secondBatchTimestamp / 500 * 500;
final long secondBatchWindowEnd = secondBatchWindowStart + 500;
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList( final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "A", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A:A", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "B", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B:B", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "C", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C:C", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "D", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D:D", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "E", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E", secondBatchTimestamp) new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E:E", secondBatchTimestamp)
); );
assertThat(windowedOutput, is(expectResult)); assertThat(windowedOutput, is(expectResult));
@ -314,7 +316,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp); produceMessages(secondTimestamp);
produceMessages(secondTimestamp); produceMessages(secondTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
groupedStream.windowedBy(TimeWindows.of(ofMillis(500L))) groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
.aggregate( .aggregate(
initializer, initializer,
@ -327,7 +329,7 @@ public class KStreamAggregationIntegrationTest {
startStreams(); startStreams();
final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp( final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(), new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
new IntegerDeserializer(), new IntegerDeserializer(),
String.class, String.class,
15); 15);
@ -345,25 +347,27 @@ public class KStreamAggregationIntegrationTest {
.thenComparingInt(KeyValueTimestamp::value); .thenComparingInt(KeyValueTimestamp::value);
windowedMessages.sort(comparator); windowedMessages.sort(comparator);
final long firstWindow = firstTimestamp / 500 * 500; final long firstWindowStart = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500; final long firstWindowEnd = firstWindowStart + 500;
final long secondWindowStart = secondTimestamp / 500 * 500;
final long secondWindowEnd = secondWindowStart + 500;
final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList( final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp)); new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp));
assertThat(windowedMessages, is(expectResult)); assertThat(windowedMessages, is(expectResult));
@ -471,7 +475,7 @@ public class KStreamAggregationIntegrationTest {
final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L; final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L;
produceMessages(thirdBatchTimestamp); produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
groupedStream groupedStream
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L)))
.reduce(reducer) .reduce(reducer)
@ -499,52 +503,56 @@ public class KStreamAggregationIntegrationTest {
windowedOutput.sort(comparator); windowedOutput.sort(comparator);
final long firstBatchLeftWindowStart = firstBatchTimestamp - timeDifference; final long firstBatchLeftWindowStart = firstBatchTimestamp - timeDifference;
final long firstBatchLeftWindowEnd = firstBatchLeftWindowStart + timeDifference;
final long firstBatchRightWindowStart = firstBatchTimestamp + 1; final long firstBatchRightWindowStart = firstBatchTimestamp + 1;
final long firstBatchRightWindowEnd = firstBatchRightWindowStart + timeDifference;
final long secondBatchLeftWindowStart = secondBatchTimestamp - timeDifference; final long secondBatchLeftWindowStart = secondBatchTimestamp - timeDifference;
final long secondBatchLeftWindowEnd = secondBatchLeftWindowStart + timeDifference;
final long secondBatchRightWindowStart = secondBatchTimestamp + 1; final long secondBatchRightWindowStart = secondBatchTimestamp + 1;
final long secondBatchRightWindowEnd = secondBatchRightWindowStart + timeDifference;
final long thirdBatchLeftWindowStart = thirdBatchTimestamp - timeDifference; final long thirdBatchLeftWindowStart = thirdBatchTimestamp - timeDifference;
final long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart + timeDifference;
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList( final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
// A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed // A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "A", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "A", firstBatchTimestamp),
// A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed // A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "A", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A", secondBatchTimestamp),
// A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed // A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "A", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "A", thirdBatchTimestamp),
// A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed // A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "A:A", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "A:A", secondBatchTimestamp),
// A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed // A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "A:A", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A:A", thirdBatchTimestamp),
// A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed // A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "A:A:A", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "A:A:A", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "B", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "B", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "B", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "B", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "B", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "B:B", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "B:B", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "B:B", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B:B", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "B:B:B", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "B:B:B", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "C", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "C", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "C", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "C", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "C", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "C:C", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "C:C", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "C:C", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C:C", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "C:C:C", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "C:C:C", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "D", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "D", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "D", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "D", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "D", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "D:D", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "D:D", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "D:D", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D:D", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "D:D:D", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "D:D:D", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "E", firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "E", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "E", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "E", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "E", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "E:E", secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "E:E", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "E:E", thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E:E", thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "E:E:E", thirdBatchTimestamp) new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "E:E:E", thirdBatchTimestamp)
); );
assertThat(windowedOutput, is(expectResult)); assertThat(windowedOutput, is(expectResult));
@ -571,7 +579,7 @@ public class KStreamAggregationIntegrationTest {
final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L; final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L;
produceMessages(thirdBatchTimestamp); produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMinutes(5))) groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMinutes(5)))
.aggregate( .aggregate(
initializer, initializer,
@ -602,50 +610,57 @@ public class KStreamAggregationIntegrationTest {
.thenComparingInt(KeyValueTimestamp::value); .thenComparingInt(KeyValueTimestamp::value);
windowedMessages.sort(comparator); windowedMessages.sort(comparator);
final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference; final long firstBatchLeftWindowStart = firstBatchTimestamp - timeDifference;
final long firstBatchRightWindow = firstBatchTimestamp + 1; final long firstBatchLeftWindowEnd = firstBatchLeftWindowStart + timeDifference;
final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference; final long firstBatchRightWindowStart = firstBatchTimestamp + 1;
final long secondBatchRightWindow = secondBatchTimestamp + 1; final long firstBatchRightWindowEnd = firstBatchRightWindowStart + timeDifference;
final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
final long secondBatchLeftWindowStart = secondBatchTimestamp - timeDifference;
final long secondBatchLeftWindowEnd = secondBatchLeftWindowStart + timeDifference;
final long secondBatchRightWindowStart = secondBatchTimestamp + 1;
final long secondBatchRightWindowEnd = secondBatchRightWindowStart + timeDifference;
final long thirdBatchLeftWindowStart = thirdBatchTimestamp - timeDifference;
final long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart + timeDifference;
final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList( final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
// A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed // A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp),
// A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed // A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp),
// A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed // A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp),
// A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed // A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp),
// A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed // A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp),
// A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed // A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp), new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, thirdBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp) new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, thirdBatchTimestamp)
); );
assertThat(windowedMessages, is(expectResult)); assertThat(windowedMessages, is(expectResult));
@ -1030,6 +1045,7 @@ public class KStreamAggregationIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) { if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName()); Serdes.serdeFrom(innerClass).getClass().getName());
@ -1052,6 +1068,7 @@ public class KStreamAggregationIntegrationTest {
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) { if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName()); Serdes.serdeFrom(innerClass).getClass().getName());
@ -1085,7 +1102,8 @@ public class KStreamAggregationIntegrationTest {
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(), "--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(), "--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator, "--property", "key.separator=" + keySeparator,
"--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName() "--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer.window.size.ms=500",
}; };
ConsoleConsumer.messageCount_$eq(0); //reset the message count ConsoleConsumer.messageCount_$eq(0); //reset the message count

View File

@ -27,7 +27,10 @@ import org.junit.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class TimeWindowedDeserializerTest { public class TimeWindowedDeserializerTest {
@ -47,6 +50,7 @@ public class TimeWindowedDeserializerTest {
final Deserializer<?> inner = timeWindowedDeserializer.innerDeserializer(); final Deserializer<?> inner = timeWindowedDeserializer.innerDeserializer();
assertNotNull("Inner deserializer should be not null", inner); assertNotNull("Inner deserializer should be not null", inner);
assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer); assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
assertThat(timeWindowedDeserializer.getWindowSize(), is(5000000L));
} }
@Test @Test
@ -55,5 +59,26 @@ public class TimeWindowedDeserializerTest {
final Deserializer<?> inner = timeWindowedDeserializer.innerDeserializer(); final Deserializer<?> inner = timeWindowedDeserializer.innerDeserializer();
assertNotNull("Inner deserializer should be not null", inner); assertNotNull("Inner deserializer should be not null", inner);
assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer);
assertThat(timeWindowedDeserializer.getWindowSize(), is(5000000L));
}
@Test
public void shouldSetWindowSizeThroughConfigs() {
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
deserializer.configure(props, false);
assertThat(deserializer.getWindowSize(), is(500L));
}
@Test
public void shouldThrowErrorIfWindowSizeSetInConfigsAndConstructor() {
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
}
@Test
public void shouldThrowErrorIfWindowSizeIsNotSet() {
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
} }
} }

View File

@ -35,7 +35,7 @@ public class WindowedSerdesTest {
@Test @Test
public void shouldWrapForTimeWindowedSerde() { public void shouldWrapForTimeWindowedSerde() {
final Serde<Windowed<String>> serde = WindowedSerdes.timeWindowedSerdeFrom(String.class); final Serde<Windowed<String>> serde = WindowedSerdes.timeWindowedSerdeFrom(String.class, Long.MAX_VALUE);
assertTrue(serde.serializer() instanceof TimeWindowedSerializer); assertTrue(serde.serializer() instanceof TimeWindowedSerializer);
assertTrue(serde.deserializer() instanceof TimeWindowedDeserializer); assertTrue(serde.deserializer() instanceof TimeWindowedDeserializer);
assertTrue(((TimeWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer); assertTrue(((TimeWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer);
@ -54,7 +54,7 @@ public class WindowedSerdesTest {
@Test @Test
public void testTimeWindowSerdeFrom() { public void testTimeWindowSerdeFrom() {
final Windowed<Integer> timeWindowed = new Windowed<>(10, new TimeWindow(0, Long.MAX_VALUE)); final Windowed<Integer> timeWindowed = new Windowed<>(10, new TimeWindow(0, Long.MAX_VALUE));
final Serde<Windowed<Integer>> timeWindowedSerde = WindowedSerdes.timeWindowedSerdeFrom(Integer.class); final Serde<Windowed<Integer>> timeWindowedSerde = WindowedSerdes.timeWindowedSerdeFrom(Integer.class, Long.MAX_VALUE);
final byte[] bytes = timeWindowedSerde.serializer().serialize(topic, timeWindowed); final byte[] bytes = timeWindowedSerde.serializer().serialize(topic, timeWindowed);
final Windowed<Integer> windowed = timeWindowedSerde.deserializer().deserialize(topic, bytes); final Windowed<Integer> windowed = timeWindowedSerde.deserializer().deserialize(topic, bytes);
Assert.assertEquals(timeWindowed, windowed); Assert.assertEquals(timeWindowed, windowed);

View File

@ -53,6 +53,7 @@ import org.junit.Test;
public class TimeWindowedCogroupedKStreamImplTest { public class TimeWindowedCogroupedKStreamImplTest {
private static final Long WINDOW_SIZE = 500L;
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";
private static final String TOPIC2 = "topic2"; private static final String TOPIC2 = "topic2";
private static final String OUTPUT = "output"; private static final String OUTPUT = "output";
@ -77,7 +78,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER); .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
windowedCogroupedStream = cogroupedStream.windowedBy(TimeWindows.of(ofMillis(500L))); windowedCogroupedStream = cogroupedStream.windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)));
} }
@Test @Test
@ -128,7 +129,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
.with(Serdes.String(), Serdes.String())); .with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.windowedBy(TimeWindows.of(ofMillis(500L))) .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
.aggregate(MockInitializer.STRING_INIT, Named.as("foo")); .aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
assertThat(builder.build().describe().toString(), equalTo( assertThat(builder.build().describe().toString(), equalTo(
@ -155,7 +156,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic( final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer()); TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic( final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 0); testInputTopic.pipeInput("k1", "A", 0);
testInputTopic.pipeInput("k2", "A", 0); testInputTopic.pipeInput("k2", "A", 0);
@ -191,7 +192,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic2 = driver.createInputTopic( final TestInputTopic<String, String> testInputTopic2 = driver.createInputTopic(
TOPIC2, new StringSerializer(), new StringSerializer()); TOPIC2, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic( final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 0); testInputTopic.pipeInput("k1", "A", 0);
testInputTopic.pipeInput("k2", "A", 0); testInputTopic.pipeInput("k2", "A", 0);
@ -226,7 +227,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic( final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer()); TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic( final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 0); testInputTopic.pipeInput("k1", "A", 0);
testInputTopic.pipeInput("k2", "A", 499); testInputTopic.pipeInput("k2", "A", 499);
@ -252,7 +253,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic( final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer()); TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic( final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 0); testInputTopic.pipeInput("k1", "A", 0);
testInputTopic.pipeInput("k2", "A", 0); testInputTopic.pipeInput("k2", "A", 0);
@ -287,7 +288,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic2 = driver.createInputTopic( final TestInputTopic<String, String> testInputTopic2 = driver.createInputTopic(
TOPIC2, new StringSerializer(), new StringSerializer()); TOPIC2, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic( final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 0); testInputTopic.pipeInput("k1", "A", 0);
testInputTopic.pipeInput("k2", "A", 0); testInputTopic.pipeInput("k2", "A", 0);

View File

@ -51,7 +51,7 @@ public class WindowKeySchemaTest {
final private Window window = new TimeWindow(startTime, endTime); final private Window window = new TimeWindow(startTime, endTime);
final private Windowed<String> windowedKey = new Windowed<>(key, window); final private Windowed<String> windowedKey = new Windowed<>(key, window);
final private WindowKeySchema windowKeySchema = new WindowKeySchema(); final private WindowKeySchema windowKeySchema = new WindowKeySchema();
final private Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde); final private Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde, Long.MAX_VALUE);
final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray()); final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray());
@Test @Test