diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index a064a5de39c..842325b528e 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1383,65 +1383,72 @@ Note that in the WordCountProcessor
implementation, users need to r
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
- KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.aggregate(
- () -> 0L, /* initializer */
- (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
- TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
- Serdes.Long(), /* serde for aggregate value */
- "time-windowed-aggregated-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream
+ .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+ .aggregate(
+ () -> 0L, /* initializer */
+ (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
+ Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
+ .withValueSerde(Serdes.Long())); /* serde for aggregate value */
+
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
- KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.aggregate(
- () -> 0L, /* initializer */
- (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
- (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
- SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
- Serdes.Long(), /* serde for aggregate value */
- "sessionized-aggregated-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream
+ .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+ .aggregate(
+ () -> 0L, /* initializer */
+ (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
+ (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
+ Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */
+ .withValueSerde(Serdes.Long())); /* serde for aggregate value */
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
- KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.aggregate(
- new Initializer<Long>() { /* initializer */
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<String, Long, Long>() { /* adder */
- @Override
- public Long apply(String aggKey, Long newValue, Long aggValue) {
- return aggValue + newValue;
- }
- },
- TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
- Serdes.Long(), /* serde for aggregate value */
- "time-windowed-aggregated-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream
+ .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+ .aggregate(
+ new Initializer<Long>() { /* initializer */
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<String, Long, Long>() { /* adder */
+ @Override
+ public Long apply(String aggKey, Long newValue, Long aggValue) {
+ return aggValue + newValue;
+ }
+ },
+ Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
+ .withValueSerde(Serdes.Long()) /* serde for aggregate value */
+ );
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
- KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.aggregate(
- new Initializer<Long>() { /* initializer */
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<String, Long, Long>() { /* adder */
- @Override
- public Long apply(String aggKey, Long newValue, Long aggValue) {
- return aggValue + newValue;
- }
- },
- new Merger<String, Long>() { /* session merger */
- @Override
- public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
- return rightAggValue + leftAggValue;
- }
- },
- SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
- Serdes.Long(), /* serde for aggregate value */
- "sessionized-aggregated-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream
+ .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+ .aggregate(
+ new Initializer<Long>() { /* initializer */
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<String, Long, Long>() { /* adder */
+ @Override
+ public Long apply(String aggKey, Long newValue, Long aggValue) {
+ return aggValue + newValue;
+ }
+ },
+ new Merger<String, Long>() { /* session merger */
+ @Override
+ public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
+ return rightAggValue + leftAggValue;
+ }
+ },
+ Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */
+ .withValueSerde(Serdes.Long()) /* serde for aggregate value */
+ );
@@ -1478,12 +1485,10 @@ Note that in the WordCountProcessor
implementation, users need to r
KGroupedTable<String, Long> groupedTable = ...;
// Counting a KGroupedStream
- KTable<String, Long> aggregatedStream = groupedStream.count(
- "counted-stream-store" /* state store name */);
+ KTable<String, Long> aggregatedStream = groupedStream.count();
// Counting a KGroupedTable
- KTable<String, Long> aggregatedTable = groupedTable.count(
- "counted-table-store" /* state store name */);
+ KTable<String, Long> aggregatedTable = groupedTable.count();
Detailed behavior for KGroupedStream
:
@@ -1518,14 +1523,14 @@ Note that in the WordCountProcessor
implementation, users need to r
KGroupedStream<String, Long> groupedStream = ...;
// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
- KTable<Windowed<String>, Long> aggregatedStream = groupedStream.count(
- TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
- "time-windowed-counted-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> aggregatedStream = groupedStream
+ .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+ .count();
// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
- KTable<Windowed<String>, Long> aggregatedStream = groupedStream.count(
- SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
- "sessionized-counted-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> aggregatedStream = groupedStream
+ .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+ .count();
Detailed behavior:
@@ -1561,14 +1566,14 @@ Note that in the WordCountProcessor
implementation, users need to r
// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
- (aggValue, newValue) -> aggValue + newValue, /* adder */
- "reduced-stream-store" /* state store name */);
+ (aggValue, newValue) -> aggValue + newValue /* adder */
+ );
// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
- (aggValue, oldValue) -> aggValue - oldValue, /* subtractor */
- "reduced-table-store" /* state store name */);
+ (aggValue, oldValue) -> aggValue - oldValue /* subtractor */
+ );
// Java 7 examples
@@ -1580,8 +1585,8 @@ Note that in the WordCountProcessor
implementation, users need to r
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
- },
- "reduced-stream-store" /* state store name */);
+ }
+ );
// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
@@ -1596,8 +1601,8 @@ Note that in the WordCountProcessor
implementation, users need to r
public Long apply(Long aggValue, Long oldValue) {
return aggValue - oldValue;
}
- },
- "reduced-table-store" /* state store name */);
+ }
+ );
Detailed behavior for KGroupedStream
:
@@ -1659,41 +1664,39 @@ Note that in the WordCountProcessor
implementation, users need to r
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
- KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce(
- (aggValue, newValue) -> aggValue + newValue, /* adder */
- TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
- "time-windowed-reduced-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream
+ .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+ .reduce((aggValue, newValue) -> aggValue + newValue /* adder */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
- KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.reduce(
- (aggValue, newValue) -> aggValue + newValue, /* adder */
- SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
- "sessionized-reduced-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream
+ .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+ .reduce((aggValue, newValue) -> aggValue + newValue); /* adder */
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
- KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce(
- new Reducer<Long>() { /* adder */
- @Override
- public Long apply(Long aggValue, Long newValue) {
- return aggValue + newValue;
- }
- },
- TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
- "time-windowed-reduced-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream
+ .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+ .reduce(
+ new Reducer<Long>() { /* adder */
+ @Override
+ public Long apply(Long aggValue, Long newValue) {
+ return aggValue + newValue;
+ }
+ });
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
- KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce(
- new Reducer<Long>() { /* adder */
- @Override
- public Long apply(Long aggValue, Long newValue) {
- return aggValue + newValue;
- }
- },
- SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
- "sessionized-reduced-stream-store" /* state store name */);
+ KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream
+ .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+ .reduce(
+ new Reducer<Long>() { /* adder */
+ @Override
+ public Long apply(Long aggValue, Long newValue) {
+ return aggValue + newValue;
+ }
+ });
@@ -1723,16 +1726,22 @@ Note that in the WordCountProcessor
implementation, users need to r
// Key: word, value: count + Properties streamsProperties == ...; + + // specify the default serdes so we don't need to elsewhere. + streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + StreamsConfig config = new StreamsConfig(streamsProperties); + KStream<String, Integer> wordCounts = ...; KGroupedStream<String, Integer> groupedStream = wordCounts - .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer())); + .groupByKey(); KTable<String, Integer> aggregated = groupedStream.aggregate( () -> 0, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - Serdes.Integer(), /* serde for aggregate value */ - "aggregated-stream-store" /* state store name */); + (aggKey, newValue, aggValue) -> aggValue + newValue /* adder */ + );
@@ -1836,8 +1845,10 @@ Note that in the WordCountProcessor
implementation, users need to r
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
- Serdes.Integer(), /* serde for aggregate value */
- "aggregated-table-store" /* state store name */);
+ Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-table-store")
+ .withKeySerde(Serdes.String() /* serde for aggregate key */)
+ .withValueSerde(Serdes.Long() /* serde for aggregate value */)
+ );
Impact of record caches:
@@ -2253,7 +2264,10 @@ Note that in the WordCountProcessor
implementation, users need to r
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
- groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
+ groupedByWord.windowedBy(TimeWindows.of(60000))
+ .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("CountsWindowStore")
+ withKeySerde(Serdes.String()); // count() sets value serde to Serdes.Long() automatically
+ );
@@ -2396,14 +2410,14 @@ Note that in the WordCountProcessor
implementation, users need to r
Topology topology = ...;
ProcessorSupplier processorSuppler = ...;
- // Create CustomStoreSupplier for store name the-custom-store
- MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
+ // Create CustomStoreBuilder for store name the-custom-store
+ MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store");
// Add the source topic
topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
- topology.addStateStore(customStoreSupplier, "the-processor");
+ topology.addStateStore(customStoreBuilder, "the-processor");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
@@ -2478,7 +2492,7 @@ Note that in the WordCountProcessor
implementation, users need to r
// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.
- groupedByWord.count("word-count");
+ groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count"));
// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -2835,22 +2849,20 @@ Note that in the WordCountProcessor
implementation, users need to r
For changelog topics you can also override the default configs on a per store basis.
- This can be done by using any method overload that has a StateStoreSupplier
as a parameter:
+ This can be done by using any method overload that has a Materialized
as a parameter:
// a map to add topic config - Map<String, String> topicConfig = new HashMap<>(); + Map<String, String> topicConfig = new HashMap<>(); topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000"); - StateStoreSupplier supplier = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .persistent() - .enableLogging(topicConfig) // pass in the config overrides - .build(); - - groupedStream.count(supplier) + final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .withLoggingEnabled(topicConfig); // pass in the config overrides + + groupedStream.count(materialized)