MINOR: stateful docs for aggregates

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3730 from enothereska/minor-docs-aggregates
This commit is contained in:
Eno Thereska 2017-08-30 10:09:08 +01:00 committed by Damian Guy
parent 949577ca77
commit bd54d2e3e0
2 changed files with 815 additions and 16 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 120 KiB

View File

@ -565,7 +565,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
match the configured default serdes.</li>
</ul>
Several variants of <code>globalTable<code> exist to e.g. specify explicit serdes.
Several variants of <code>globalTable</code> exist to e.g. specify explicit serdes.
</td>
</tbody>
@ -575,7 +575,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<p>
<code>KStream</code> and <code>KTable</code> support a variety of transformation operations. Each of these operations
can be translated into one or more connected processors into the underlying processor topology. Since <code>KStream</code>
and <code>KTable</code> are strongly typed, all these transformation operations are defined as generics functions where
and <code>KTable</code> are strongly typed, all these transformation operations are defined as generic functions where
users could specify the input and output data types.
</p>
<p>
@ -1014,13 +1014,819 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<h5><a id="streams_dsl_transformations_stateful" href="#streams_dsl_transformations_stateful">Stateful transformations</a></h5>
Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not
require a state store associated with the stream processor.Stateful transformations, by definition, depend on state for processing
inputs and producing outputs, and hence implementation-wise they require a state store associated with the stream processor. For
example, in aggregating operations, a windowing state store is used to store the latest aggregation results per window; in join
operations, a windowing state store is used to store all the records received so far within the defined window boundary.
<h6><a id="streams_dsl_transformations_stateful_overview" href="#streams_dsl_transformations_stateful_overview">Overview</a></h6>
<p>
Stateful transformations, by definition, depend on state for processing inputs and producing outputs, and
hence implementation-wise they require a state store associated with the stream processor. For example,
in aggregating operations, a windowing state store is used to store the latest aggregation results per window;
in join operations, a windowing state store is used to store all the records received so far within the
defined window boundary.
</p>
<p>
Note, that state stores are fault-tolerant. In case of failure, Kafka Streams guarantees to fully restore
all state stores prior to resuming the processing.
</p>
<p>
Available stateful transformations in the DSL include:
<ul>
<li><a href=#streams_dsl_aggregations>Aggregating</a></li>
<li><a href="#streams_dsl_joins">Joining</a></li>
<li><a href="#streams_dsl_windowing">Windowing (as part of aggregations and joins)</a></li>
<li>Applying custom processors and transformers, which may be stateful, for Processor API integration</li>
</ul>
</p>
<p>
The following diagram shows their relationships:
</p>
<figure>
<img class="centered" src="/{{version}}/images/streams-stateful_operations.png" style="width:500pt;">
<figcaption style="text-align: center;"><i>Stateful transformations in the DSL</i></figcaption>
</figure>
<p>
We will discuss the various stateful transformations in detail in the subsequent sections. However, let's start
with a first example of a stateful application: the canonical WordCount algorithm.
</p>
<p>
WordCount example in Java 8+, using lambda expressions:
</p>
<pre class="brush: java;">
// We assume record values represent lines of text. For the sake of this example, we ignore
// whatever may be stored in the record keys.
KStream&lt;String, String&gt; textLines = ...;
KStream&lt;String, Long&gt; wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the record
// values, i.e. we can ignore whatever data is in the record keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the stream by word to ensure the key of the record is the word.
.groupBy((key, word) -> word)
// Count the occurrences of each word (record key).
//
// This will change the stream type from `KGroupedStream&lt;String, String&gt;` to
// `KTable&lt;String, Long&gt;` (word -> count). We must provide a name for
// the resulting KTable, which will be used to name e.g. its associated
// state store and changelog topic.
.count("Counts")
// Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.
.toStream();
</pre>
<p>
WordCount example in Java 7:
</p>
<pre class="brush: java;">
// Code below is equivalent to the previous Java 8+ example above.
KStream&lt;String, String&gt; textLines = ...;
KStream&lt;String, Long&gt; wordCounts = textLines
.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String value) {
return Arrays.asList(value.toLowerCase().split("\\W+"));
}
})
.groupBy(new KeyValueMapper&lt;String, String, String&gt;&gt;() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count("Counts")
.toStream();
</pre>
<h6><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h6>
<p>
Once records are grouped by key via <code>groupByKey</code> or <code>groupBy</code> -- and
thus represented as either a <code>KGroupedStream</code> or a
<code>KGroupedTable</code> -- they can be aggregated via an operation such as
<code>reduce</code>. Aggregations are <i>key-based</i> operations, i.e.
they always operate over records (notably record values) <i>of the same key</i>. You may
choose to perform aggregations on
<a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
</p>
<table class="data-table" border="1">
<tbody>
<tr>
<th>Transformation</th>
<th>Description</th>
</tr>
<tr>
<td><b>Aggregate</b>: <code>KGroupedStream &rarr; KTable</code> or <code>KGroupedTable
&rarr; KTable</code></td>
<td>
<p>
<b>Rolling aggregation</b>. Aggregates the values of (non-windowed) records by
the grouped key. Aggregating is a generalization of <code>reduce</code> and allows, for example, the
aggregate value to have a different type than the input values.
</p>
<p>
When aggregating a grouped stream, you must provide an initializer (think:
<code>aggValue = 0</code>) and an "adder"
aggregator (think: <code>aggValue + curValue</code>). When aggregating a <i>grouped</i>
table, you must additionally provide a "subtractor" aggregator (think: <code>aggValue - oldValue</code>).
</p>
<p>
Several variants of <code>aggregate</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;byte[], String&gt; groupedStream = ...;
KGroupedTable&lt;byte[], String&gt; groupedTable = ...;
// Java 8+ examples, using lambda expressions
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
Serdes.Long(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
Serdes.Long(), /* serde for aggregate value */
"aggregated-table-store" /* state store name */);
// Java 7 examples
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
@Override
public Long apply(byte[] aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
Serdes.Long(),
"aggregated-stream-store");
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
@Override
public Long apply(byte[] aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* subtractor */
@Override
public Long apply(byte[] aggKey, String oldValue, Long aggValue) {
return aggValue - oldValue.length();
}
},
Serdes.Long(),
"aggregated-table-store");
</pre>
<p>
Detailed behavior of <code>KGroupedStream</code>:
</p>
<ul>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time, the initializer is called
(and called before the adder).</li>
<li>Whenever a record with a non-null value is received, the adder is called.</li>
</ul>
<p>
Detailed behavior of KGroupedTable:
</p>
<ul>
<li>Input records with null keys are ignored in general.</li>
<li>When a record key is received for the first time, the initializer is called
(and called before the adder and subtractor). Note that, in contrast to <code>KGroupedStream</code>, over
time the initializer may be called more
than once for a key as a result of having received input tombstone records
for that key (see below).</li>
<li>When the first non-<code>null</code> value is received for a key (think:
INSERT), then only the adder is called.</li>
<li>When subsequent non-<code>null</code> values are received for a key (think:
UPDATE), then (1) the subtractor is called
with the old value as stored in the table and (2) the adder is called with
the new value of the input record
that was just received. The order of execution for the subtractor and adder
is not defined.</li>
<li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is
received for a key (think: DELETE), then
only the subtractor is called. Note that, whenever the subtractor returns a
<code>null</code> value itself, then the
corresponding key is removed from the resulting KTable. If that happens, any
next input record for that key will trigger the initializer again.</li>
</ul>
<p>
See the example at the bottom of this section for a visualization of the
aggregation semantics.
</p>
</td>
</tr>
<tr>
<td><b>Aggregate (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
<td>
<p>
<b>Windowed aggregation</b>. Aggregates the values of records, per window, by
the grouped key. Aggregating is a generalization of
<code>reduce</code> and allows, for example, the aggregate value to have a
different type than the input values.
</p>
<p>
You must provide an initializer (think: <code>aggValue = 0</code>), "adder"
aggregator (think: <code>aggValue + curValue</code>),
and a window. When windowing based on sessions, you must additionally provide a
"session merger" aggregator (think:
<code>mergedAggValue = leftAggValue + rightAggValue</code>).
</p>
<p>
The windowed <code>aggregate</code> turns a <code>KGroupedStream
&lt;K , V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
</p>
<p>
Several variants of <code>aggregate</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
import java.util.concurrent.TimeUnit;
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
Serdes.Long(), /* serde for aggregate value */
"time-windowed-aggregated-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
Serdes.Long(), /* serde for aggregate value */
"sessionized-aggregated-stream-store" /* state store name */);
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;String, Long, Long&gt;() { /* adder */
@Override
public Long apply(String aggKey, Long newValue, Long aggValue) {
return aggValue + newValue;
}
},
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
Serdes.Long(), /* serde for aggregate value */
"time-windowed-aggregated-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;String, Long, Long&gt;() { /* adder */
@Override
public Long apply(String aggKey, Long newValue, Long aggValue) {
return aggValue + newValue;
}
},
new Merger&lt;String, Long&gt;() { /* session merger */
@Override
public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
return rightAggValue + leftAggValue;
}
},
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
Serdes.Long(), /* serde for aggregate value */
"sessionized-aggregated-stream-store" /* state store name */);
</pre>
<p>
Detailed behavior:
</p>
<ul>
<li>The windowed aggregate behaves similar to the rolling aggregate described
above. The additional twist is that the behavior applies per window.</li>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, the
initializer is called (and called before the adder).</li>
<li>Whenever a record with a non-<code>null</code> value is received for a given window, the
adder is called.
(Note: As a result of a known bug in Kafka 0.11.0.0, the adder is currently
also called for <code>null</code> values. You can work around this, for example, by
manually filtering out <code>null</code> values prior to grouping the stream.)</li>
<li>When using session windows: the session merger is called whenever two
sessions are being merged.</li>
</ul>
<p>
See the example at the bottom of this section for a visualization of the aggregation semantics.
</p>
</td>
</tr>
<tr>
<td><b>Count</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
<td>
<p>
<b>Rolling aggregation</b>. Counts the number of records by the grouped key.
Several variants of <code>count</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
KGroupedTable&lt;String, Long&gt; groupedTable = ...;
// Counting a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count(
"counted-stream-store" /* state store name */);
// Counting a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count(
"counted-table-store" /* state store name */);
</pre>
<p>
Detailed behavior for <code>KGroupedStream</code>:
</p>
<ul>
<li>Input records with null keys or values are ignored.</li>
</ul>
<p>
Detailed behavior for <code>KGroupedTable</code>:
</p>
<ul>
<li>Input records with <code>null</code> keys are ignored. Records with <code>null</code>
values are not ignored but interpreted as "tombstones" for the corresponding key, which
indicate the deletion of the key from the table.</li>
</ul>
</td>
</tr>
<tr>
<td><b>Count (Windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
<td>
<p>
Windowed aggregation. Counts the number of records, per window, by the grouped key.
</p>
<p>
The windowed <code>count</code> turns a <code>KGroupedStream<&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
</p>
<p>
Several variants of count exist, see Javadocs for details.
</p>
<pre class="brush: java;">
import java.util.concurrent.TimeUnit;
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream.count(
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-counted-stream-store" /* state store name */);
// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream.count(
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-counted-stream-store" /* state store name */);
</pre>
<p>
Detailed behavior:
</p>
<ul>
<li>Input records with <code>null</code> keys or values are ignored. (Note: As a result of a known bug in Kafka 0.11.0.0,
records with <code>null</code> values are not ignored yet. You can work around this, for example, by manually
filtering out <code>null</code> values prior to grouping the stream.)</li>
</ul>
</td>
</tr>
<tr>
<td><b>Reduce</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
<td>
<p>
<b>Rolling aggregation</b>. Combines the values of (non-windowed) records by the grouped key. The current record value is
combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed,
unlike <code>aggregate</code>.
</p>
<p>
When reducing a grouped stream, you must provide an "adder" reducer (think: <code>aggValue + curValue</code>).
When reducing a grouped table, you must additionally provide a "subtractor" reducer (think: <code>aggValue - oldValue</code>).
</p>
<p>
Several variants of <code>reduce</code> exist, see Javadocs for details.
</p>
<pre class="brush: java;">
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
KGroupedTable&lt;String, Long&gt; groupedTable = ...;
// Java 8+ examples, using lambda expressions
// Reducing a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
"reduced-stream-store" /* state store name */);
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
(aggValue, oldValue) -> aggValue - oldValue, /* subtractor */
"reduced-table-store" /* state store name */);
// Java 7 examples
// Reducing a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
"reduced-stream-store" /* state store name */);
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
new Reducer&lt;Long&gt;() { /* subtractor */
@Override
public Long apply(Long aggValue, Long oldValue) {
return aggValue - oldValue;
}
},
"reduced-table-store" /* state store name */);
</pre>
<p>
Detailed behavior for <code>KGroupedStream</code>:
</p>
<ul>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that
record is used as the initial aggregate value.</li>
<li>Whenever a record with a non-<code>null</code> value is received, the adder is called.</li>
</ul>
<p>
Detailed behavior for <code>KGroupedTable</code>:
</p>
<ul>
<li>Input records with null keys are ignored in general.</li>
<li>When a record key is received for the first time, then the value of that
record is used as the initial aggregate value.
Note that, in contrast to KGroupedStream, over time this initialization step
may happen more than once for a key as a
result of having received input tombstone records for that key (see below).</li>
<li>When the first non-<code>null</code> value is received for a key (think: INSERT), then
only the adder is called.</li>
<li>When subsequent non-<code>null</code> values are received for a key (think: UPDATE), then
(1) the subtractor is called with the
old value as stored in the table and (2) the adder is called with the new
value of the input record that was just received.
The order of execution for the subtractor and adder is not defined.</li>
<li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is received
for a key (think: DELETE), then only the
subtractor is called. Note that, whenever the subtractor returns a <code>null</code>
value itself, then the corresponding key
is removed from the resulting KTable. If that happens, any next input
record for that key will re-initialize its aggregate value.</li>
</ul>
<p>
See the example at the bottom of this section for a visualization of the
aggregation semantics.
<p>
</td>
</tr>
<tr>
<td><b>Reduce (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
<td>
<p>
Windowed aggregation. Combines the values of records, per window, by the grouped key. The current record value
is combined with the last reduced value, and a new reduced value is returned. Records with null key or value are
ignored. The result value type cannot be changed, unlike aggregate. (KGroupedStream details)
</p>
<p>
The windowed reduce turns a <code>KGroupedStream&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
</p>
<p>
Several variants of reduce exist, see Javadocs for details.
</p>
<pre class="brush: java;">
import java.util.concurrent.TimeUnit;
KGroupedStream&lt;String, Long&gt; groupedStream = ...;
// Java 8+ examples, using lambda expressions
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-reduced-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream = groupedStream.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-reduced-stream-store" /* state store name */);
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */
"time-windowed-reduced-stream-store" /* state store name */);
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */
"sessionized-reduced-stream-store" /* state store name */);
</pre>
<p>
Detailed behavior:
</p>
<ul>
<li>The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that
the behavior applies per window.</li>
<li>Input records with <code>null</code> keys are ignored in general.</li>
<li>When a record key is received for the first time for a given window, then the value of that record is
used as the initial aggregate value.</li>
<li>Whenever a record with a non-<code>null</code> value is received for a given window, the adder is called. (Note: As
a result of a known bug in Kafka 0.11.0.0, the adder is currently also called for <code>null</code> values. You can work
around this, for example, by manually filtering out <code>null</code> values prior to grouping the stream.)</li>
<li>See the example at the bottom of this section for a visualization of the aggregation semantics.</li>
</ul>
</td>
</tr>
</tbody>
</table>
<p>
<b>Example of semantics for stream aggregations</b>: A <code>KGroupedStream &rarr; KTable</code> example is shown below. The streams and the table are
initially empty. We use bold font in the column for "KTable <code>aggregated</code>" to highlight changed state. An entry such as <code>(hello, 1)</code>
denotes a record with key <code>hello</code> and value <code>1</code>. To improve the readability of the semantics table we assume that all records are
processed in timestamp order.
</p>
<pre class="brush: java;">
// Key: word, value: count
KStream&lt;String, Integer&gt; wordCounts = ...;
KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
.groupByKey(Serdes.String(), Serdes.Integer());
KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
Serdes.Integer(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
</pre>
<p>
<b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>: For illustration purposes,
the column "KTable <code>aggregated</code>" below shows the table's state changes over
time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are
disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the
rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key <code>kafka</code> in the KTable
(here: from <code>(kafka 1)</code> directly to <code>(kafka, 3)</code>. Typically, you should only disable record caches for testing or debugging purposes
-- under normal circumstances it is better to leave record caches enabled.
</p>
<table class="data-table" border="1">
<thead>
<col>
<colgroup span="2"></colgroup>
<colgroup span="2"></colgroup>
<col>
<tr>
<th scope="col"></th>
<th colspan="2">KStream wordCounts</th>
<th colspan="2">KGroupedStream groupedStream</th>
<th scope="col">KTable aggregated</th>
</tr>
</thead>
<tbody>
<tr>
<th scope="col">Timestamp</th>
<th scope="col">Input record</th>
<th scope="col">Grouping</th>
<th scope="col">Initializer</th>
<th scope="col">Adder</th>
<th scope="col">State</th>
</tr>
<tr>
<td>1</td>
<td>(hello, 1)</td>
<td>(hello, 1)</td>
<td>0 (for hello)</td>
<td>(hello, 0 + 1)</td>
<td>(hello, 1)</td>
</tr>
<tr>
<td>2</td>
<td>(kafka, 1)</td>
<td>(kafka, 1)</td>
<td>0 (for kafka)</td>
<td>(kafka, 0 + 1)</td>
<td>(hello, 1), (kafka, 1)</td>
</tr>
<tr>
<td>3</td>
<td>(streams, 1)</td>
<td>(streams, 1)</td>
<td>0 (for streams)</td>
<td>(streams, 0 + 1)</td>
<td>(hello, 1), (kafka, 1), (streams, 1)</td>
</tr>
<tr>
<td>4</td>
<td>(kafka, 1)</td>
<td>(kafka, 1)</td>
<td></td>
<td>(kafka, 1 + 1)</td>
<td>(hello, 1), (kafka, 2), (streams, 1)</td>
</tr>
<tr>
<td>5</td>
<td>(kafka, 1)</td>
<td>(kafka, 1)</td>
<td></td>
<td>(kafka, 2 + 1)</td>
<td>(hello, 1), (kafka, 3), (streams, 1)</td>
</tr>
<tr>
<td>6</td>
<td>(streams, 1)</td>
<td>(streams, 1)</td>
<td></td>
<td>(streams, 1 + 1)</td>
<td>(hello, 1), (kafka, 3), (streams, 2)</td>
</tr>
</tbody>
</table>
<p>
Example of semantics for table aggregations: A <code>KGroupedTable &rarr; KTable</code> example is shown below. The tables are initially empty.
We use bold font in the column for "KTable <code>aggregated</code>" to highlight changed state. An entry such as <code>(hello, 1)</code> denotes a
record with key <code>hello</code> and value <code>1</code>. To improve the readability of the semantics table we assume that all records are processed
in timestamp order.
</p>
<pre class="brush: java;">
// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")
KTable&lt;String, String&gt; userProfiles = ...;
// Re-group `userProfiles`. Don't read too much into what the grouping does:
// its prime purpose in this example is to show the *effects* of the grouping
// in the subsequent aggregation.
KGroupedTable&lt;String, Integer&gt; groupedTable = userProfiles
.groupBy((user, region) -> KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer());
KTable&lt;String, Integer&gt; aggregated = groupedTable.aggregate(
() -> 0, /* initializer */
(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
(aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
Serdes.Integer(), /* serde for aggregate value */
"aggregated-table-store" /* state store name */);
</pre>
<p>
<b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>:
For illustration purposes, the column "KTable <code>aggregated</code>" below shows
the table's state changes over time in a very granular way. In practice, you would observe state changes
in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled,
what might happen for example is that the output results of the rows with timestamps 4 and 5 would be
compacted, and there would only be a single state update for the key <code>kafka</code> in the KTable
(here: from <code>(kafka 1)</code> directly to <code>(kafka, 3)</code>. Typically, you should only disable
record caches for testing or debugging purposes -- under normal circumstances it is better to leave record caches enabled.
</p>
<table class="data-table" border="1">
<thead>
<col>
<colgroup span="2"></colgroup>
<colgroup span="2"></colgroup>
<col>
<tr>
<th scope="col"></th>
<th colspan="3">KTable userProfiles</th>
<th colspan="3">KGroupedTable groupedTable</th>
<th scope="col">KTable aggregated</th>
</tr>
</thead>
<tbody>
<tr>
<th scope="col">Timestamp</th>
<th scope="col">Input record</th>
<th scope="col">Interpreted as</th>
<th scope="col">Grouping</th>
<th scope="col">Initializer</th>
<th scope="col">Adder</th>
<th scope="col">Subtractor</th>
<th scope="col">State</th>
</tr>
<tr>
<td>1</td>
<td>(alice, E)</td>
<td>INSERT alice</td>
<td>(E, 5)</td>
<td>0 (for E)</td>
<td>(E, 0 + 5)</td>
<td></td>
<td>(E, 5)</td>
</tr>
<tr>
<td>2</td>
<td>(bob, A)</td>
<td>INSERT bob</td>
<td>(A, 3)</td>
<td>0 (for A)</td>
<td>(A, 0 + 3)</td>
<td></td>
<td>(A, 3), (E, 5)</td>
</tr>
<tr>
<td>3</td>
<td>(charlie, A)</td>
<td>INSERT charlie</td>
<td>(A, 7)</td>
<td></td>
<td>(A, 3 + 7)</td>
<td></td>
<td>(A, 10), (E, 5)</td>
</tr>
<tr>
<td>4</td>
<td>(alice, A)</td>
<td>UPDATE alice</td>
<td>(A, 5)</td>
<td></td>
<td>(A, 10 + 5)</td>
<td>(E, 5 - 5)</td>
<td>(A, 15), (E, 0)</td>
</tr>
<tr>
<td>5</td>
<td>(charlie, null)</td>
<td>DELETE charlie</td>
<td>(null, 7)</td>
<td></td>
<td></td>
<td>(A, 15 - 7)</td>
<td>(A, 8), (E, 0)</td>
</tr>
<tr>
<td>6</td>
<td>(null, E)</td>
<td>ignored</td>
<td></td>
<td></td>
<td></td>
<td></td>
<td>(A, 8), (E, 0)</td>
</tr>
<tr>
<td>7</td>
<td>(bob, E)</td>
<td>UPDATE bob</td>
<td>(E, 3)</td>
<td></td>
<td>(E, 0 + 3)</td>
<td>(A, 8 - 3)</td>
<td>(A, 5), (E, 3)</td>
</tr>
</tbody>
</table>
<h6><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h6>
A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:
@ -1066,14 +1872,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
Depending on the operands the following join operations are supported: <b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>.
Their <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">semantics</a> are similar to the corresponding operators in relational databases.
<h6><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h6>
An <b>aggregation</b> operation takes one input stream, and yields a new stream by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. An aggregation over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the aggregation may grow indefinitely.
<p>
In the Kafka Streams DSL, an input stream of an aggregation can be a <code>KStream</code> or a <code>KTable</code>, but the output stream will always be a <code>KTable</code>.
This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted.
When such late arrival happens, the aggregating <code>KStream</code> or <code>KTable</code> simply emits a new aggregate value. Because the output is a <code>KTable</code>, the new value is considered to overwrite the old value with the same key in subsequent processing steps.
</p>
<h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h4>
@ -1124,7 +1923,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<figure>
<img class="centerd" src="/{{version}}/images/streams-interactive-queries-01.png" style="width:600pt;">
<img class="centered" src="/{{version}}/images/streams-interactive-queries-01.png" style="width:600pt;">
<figcaption style="text-align: center;"><i>Without interactive queries: increased complexity and heavier footprint of architecture</i></figcaption>
</figure>
@ -1222,7 +2021,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<figure>
<img class="centerd" src="/{{version}}/images/streams-interactive-queries-api-01.png" style="width:500pt;">
<img class="centered" src="/{{version}}/images/streams-interactive-queries-api-01.png" style="width:500pt;">
<figcaption style="text-align: center;"><i>Every application instance can directly query any of its local state stores</i></figcaption>
</figure>