KAFKA-5853; implement WindowedKStream

Add the `WindowedKStream` interface and implementation of methods that don't require `Materialized`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3809 from dguy/kgrouped-stream-windowed-by
This commit is contained in:
Damian Guy 2017-09-08 16:49:18 +01:00
parent beeed86600
commit e16b9143df
9 changed files with 544 additions and 46 deletions

View File

@ -1175,9 +1175,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
Once records are grouped by key via <code>groupByKey</code> or <code>groupBy</code> -- and
thus represented as either a <code>KGroupedStream</code> or a
<code>KGroupedTable</code> -- they can be aggregated via an operation such as
<code>reduce</code>. Aggregations are <i>key-based</i> operations, i.e.
they always operate over records (notably record values) <i>of the same key</i>. You may
choose to perform aggregations on
<code>reduce</code>.
For windowed aggregations use <code>windowedBy(Windows).reduce(Reducer)</code>.
Aggregations are <i>key-based</i> operations, i.e.they always operate over records (notably record values) <i>of the same key</i>.
You maychoose to perform aggregations on
<a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
</p>
<table class="data-table" border="1">
@ -1205,20 +1206,20 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
Several variants of <code>aggregate</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;byte[], String&gt; groupedStream = ...;
KGroupedTable&lt;byte[], String&gt; groupedTable = ...;
KGroupedStream&lt;Bytes, String&gt; groupedStream = ...;
KGroupedTable&lt;Bytes, String&gt; groupedTable = ...;
// Java 8+ examples, using lambda expressions
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
Serdes.Long(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
@ -1226,19 +1227,26 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
"aggregated-table-store" /* state store name */);
// windowed aggregation
KTable&lt;Windowed&ltBytes&gt;, Long&gt; windowedAggregate = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
.aggregate(() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* aggregator */
Serdes.Long()) /* serde for aggregate value */
// Java 7 examples
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
@Override
public Long apply(byte[] aggKey, String newValue, Long aggValue) {
public Long apply(Bytes aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
@ -1246,27 +1254,44 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
"aggregated-stream-store");
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
@Override
public Long apply(byte[] aggKey, String newValue, Long aggValue) {
public Long apply(Bytes aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* subtractor */
new Aggregator&lt;Bytes, String, Long&gt;() { /* subtractor */
@Override
public Long apply(byte[] aggKey, String oldValue, Long aggValue) {
public Long apply(Bytes aggKey, String oldValue, Long aggValue) {
return aggValue - oldValue.length();
}
},
Serdes.Long(),
"aggregated-table-store");
// Windowed aggregation
KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
@Override
public Long apply(Bytes aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
Serdes.Long());
</pre>
<p>
Detailed behavior of <code>KGroupedStream</code>:

View File

@ -85,6 +85,12 @@
An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
</p>
<p>
Windowed aggregations have moved from <code>KGroupedStream</code> to <code>WindowedKStream</code>.
You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
Note: the previous aggregate functions on <code>KGroupedStream</code> still work, but have been deprecated.
</p>
<p>
The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time.
As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>.

View File

@ -223,7 +223,9 @@ public interface KGroupedStream<K, V> {
* @param windows the specification of the aggregation {@link Windows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows);
/**
@ -619,7 +621,9 @@ public interface KGroupedStream<K, V> {
* @param windows the specification of the aggregation {@link Windows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows);
@ -1112,7 +1116,9 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows)}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
@ -1333,4 +1339,12 @@ public interface KGroupedStream<K, V> {
final Serde<T> aggValueSerde,
final StateStoreSupplier<SessionStore> storeSupplier);
/**
* Create a new {@link WindowedKStream} instance that can be used to perform windowed aggregations.
* @param windows the specification of the aggregation {@link Windows}
* @param <W> the window type
* @return an instance of {@link WindowedKStream}
*/
<W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows);
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.KeyValueStore;
/**
* {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
* It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
* {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
* new (partitioned) windows resulting in a windowed {@link KTable}
* (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
* <p>
* A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
*
* @param <K> Type of keys
* @param <V> Type of values
* @see KStream
* @see KGroupedStream
*/
public interface WindowedKStream<K, V> {
/**
* Count the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
* materialized view) that can be queried using the provided {@code queryableName}.
* Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queriable through Interactive Queries.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
KTable<Windowed<K>, Long> count();
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
* provide an initial intermediate aggregation result that is used to process the first record.
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Serde)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* Note that the internal store name may not be queriable through Interactive Queries.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde);
/**
* Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
* aggregate and the record's value.
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
* Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedKStream;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@ -39,8 +40,8 @@ import java.util.Set;
class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
static final String REDUCE_NAME = "KSTREAM-REDUCE-";
static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
private final Serde<K> keySerde;
private final Serde<V> valSerde;
@ -102,7 +103,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows) {
return reduce(reducer, windows, (String) null);
return windowedBy(windows).reduce(reducer);
}
@SuppressWarnings("unchecked")
@ -164,7 +165,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
final Serde<T> aggValueSerde) {
return aggregate(initializer, aggregator, windows, aggValueSerde, null);
return windowedBy(windows).aggregate(initializer, aggregator, aggValueSerde);
}
@SuppressWarnings("unchecked")
@ -219,7 +220,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
return count(windows, (String) null);
return windowedBy(windows).count();
}
@Override
@ -291,6 +292,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@Override
public <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows) {
return new WindowedKStreamImpl<>(windows,
builder,
sourceNodes,
name,
keySerde,
valSerde,
repartitionRequired);
}
@SuppressWarnings("unchecked")
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
determineIsQueryable(queryableStoreName);

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedKStream;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
private final Windows<W> windows;
private final boolean repartitionRequired;
private final Serde<K> keySerde;
private final Serde<V> valSerde;
WindowedKStreamImpl(final Windows<W> windows,
final InternalStreamsBuilder builder,
final Set<String> sourceNodes,
final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final boolean repartitionRequired) {
super(builder, name, sourceNodes);
Objects.requireNonNull(windows, "windows can't be null");
this.valSerde = valSerde;
this.keySerde = keySerde;
this.repartitionRequired = repartitionRequired;
this.windows = windows;
}
@Override
public KTable<Windowed<K>, Long> count() {
return aggregate(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
}, new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
},
Serdes.Long());
}
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
final String aggFunctionName = builder.newName(AGGREGATE_NAME);
final String storeName = builder.newStoreName(AGGREGATE_NAME);
return doAggregate(aggValueSerde,
aggFunctionName,
storeName,
new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator));
}
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
Objects.requireNonNull(reducer, "reducer can't be null");
final String storeName = builder.newStoreName(REDUCE_NAME);
return doAggregate(valSerde,
builder.newName(REDUCE_NAME),
storeName,
new KStreamWindowReduce<>(windows, storeName, reducer));
}
@SuppressWarnings("unchecked")
private <VR> KTable<Windowed<K>, VR> doAggregate(final Serde<VR> aggValueSerde,
final String aggFunctionName,
final String storeName,
final KStreamAggProcessorSupplier aggSupplier) {
final String sourceName = repartitionIfRequired(storeName);
final StoreBuilder<WindowStore<K, VR>> storeBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(
storeName,
windows.maintainMs(),
windows.segments,
windows.size(),
false),
keySerde,
aggValueSerde)
.withCachingEnabled();
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName);
builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
return new KTableImpl<>(
builder,
aggFunctionName,
aggSupplier,
sourceName.equals(this.name) ? sourceNodes
: Collections.singleton(sourceName),
storeName,
false);
}
/**
* @return the new sourceName if repartitioned. Otherwise the name of this stream
*/
private String repartitionIfRequired(final String queryableStoreName) {
if (!repartitionRequired) {
return this.name;
}
return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
}
}

View File

@ -202,7 +202,8 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
groupedStream
.reduce(reducer, TimeWindows.of(500L), "reduce-time-windows")
.windowedBy(TimeWindows.of(500L))
.reduce(reducer)
.toStream(new KeyValueMapper<Windowed<String>, String, String>() {
@Override
public String apply(final Windowed<String> windowedKey, final String value) {
@ -211,6 +212,7 @@ public class KStreamAggregationIntegrationTest {
})
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
startStreams();
final List<KeyValue<String, String>> windowedOutput = receiveMessages(
@ -302,18 +304,18 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
produceMessages(secondTimestamp);
groupedStream.aggregate(
groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate(
initializer,
aggregator,
TimeWindows.of(500L),
Serdes.Integer(), "aggregate-by-key-windowed")
Serdes.Integer())
.toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
@Override
public String apply(final Windowed<String> windowedKey, final Integer value) {
return windowedKey.key() + "@" + windowedKey.window().start();
}
})
.to(Serdes.String(), Serdes.Integer(), outputTopic);
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
startStreams();
@ -414,13 +416,14 @@ public class KStreamAggregationIntegrationTest {
produceMessages(timestamp);
stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
.count(TimeWindows.of(500L), "count-windows")
.windowedBy(TimeWindows.of(500L))
.count()
.toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
@Override
public String apply(final Windowed<Integer> windowedKey, final Long value) {
return windowedKey.key() + "@" + windowedKey.window().start();
}
}).to(Serdes.String(), Serdes.Long(), outputTopic);
}).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
startStreams();

View File

@ -525,6 +525,7 @@ public class KGroupedStreamImplTest {
doCountWindowed(results);
}
@SuppressWarnings("deprecation")
@Test
public void shouldCountWindowedWithInternalStoreName() throws Exception {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedKStream;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class WindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private WindowedKStream<String, String> windowedStream;
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(500L));
}
@Test
public void shouldCountWindowed() {
final Map<Windowed<String>, Long> results = new HashMap<>();
windowedStream.count()
.toStream()
.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.put(key, value);
}
});
processData();
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
}
@Test
public void shouldReduceWindowed() {
final Map<Windowed<String>, String> results = new HashMap<>();
windowedStream.reduce(MockReducer.STRING_ADDER)
.toStream()
.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
}
});
processData();
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
}
@Test
public void shouldAggregateWindowed() {
final Map<Windowed<String>, String> results = new HashMap<>();
windowedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Serdes.String())
.toStream()
.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
}
});
processData();
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
windowedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
windowedStream.reduce(null);
}
private void processData() {
driver.setUp(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "1");
driver.setTime(15);
driver.process(TOPIC, "1", "2");
driver.setTime(500);
driver.process(TOPIC, "1", "3");
driver.process(TOPIC, "2", "1");
driver.flushState();
}
}