MINOR: Document how to create source streams and tables

Originally reviewed as part of https://github.com/apache/kafka/pull/3490.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3701 from enothereska/minor-docs-create-source-streams
This commit is contained in:
Eno Thereska 2017-08-22 14:14:35 +01:00 committed by Damian Guy
parent ee8e9349b2
commit ecea150bde
1 changed files with 115 additions and 10 deletions

View File

@ -450,21 +450,126 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
If these records a KStream and the stream processing application were to sum the values it would return <code>4</code>. If these records were a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
<h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h4>
<h4><a id="streams_dsl_source" href="#streams_dsl_source">Creating Source Streams from Kafka</a></h4>
<p>
Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
from a single topic).
You can easily read data from Kafka topics into your application. We support the following operations.
</p>
<table class="data-table" border="1">
<tbody><tr>
<th>Reading from Kafka</th>
<th>Description</th>
</tr>
<tr>
<td><b>Stream</b>: input topic(s) &rarr; <code>KStream</code></td>
<td>Create a <code>KStream</code> from the specified Kafka input topic(s), interpreting the data as a record stream.
A <code>KStream</code> represents a partitioned record stream.
<p>
Slightly simplified, in the case of a KStream, the local KStream instance of every application instance will be populated
with data from only a <b>subset</b> of the partitions of the input topic. Collectively, i.e. across all application instances,
all the partitions of the input topic will be read and processed.
</p>
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", "topic2");
KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", "stateStoreName");
GlobalKTable&lt;String, GenericRecord&gt; source2 = builder.globalTable("topic4", "globalStoreName");
KStream&lt;String, Long&gt; wordCounts = builder.stream(
Serdes.String(), /* key serde */
Serdes.Long(), /* value serde */
"word-counts-input-topic" /* input topic */);
</pre>
When to provide serdes explicitly:
<ul>
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic(s) do not match
the configured default serdes. </li>
</ul>
Several variants of <code>stream</code> exist to e.g. specify a regex pattern for input topics to read from.</td>
</tr>
<tr>
<td><b>Table</b>: input topic(s) &rarr; <code>KTable</code></td>
<td>
Reads the specified Kafka input topic into a <code>KTable</code>. The topic is interpreted as a changelog stream,
where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or
as DELETE (when the value is null) for that key.
<p>
Slightly simplified, in the case of a KTable, the local KTable instance of every application instance will be populated
with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, all
the partitions of the input topic will be read and processed.
</p>
<p>
You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
When a name is not provided the table will not queryable and an internal name will be provided for the state store.
</p>
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
StreamsBuilder builder = new StreamsBuilder();
KTable&lt;String, Long&gt; wordCounts = builder.table(
Serdes.String(), /* key serde */
Serdes.Long(), /* value serde */
"word-counts-input-topic", /* input topic */
"word-counts-partitioned-store" /* table/store name */);
</pre>
When to provide serdes explicitly:
<ul>
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
match the configured default serdes.</li>
</ul>
Several variants of <code>table</code> exist to e.g. specify the <code>auto.offset.reset</code>
policy to be used when reading from the input topic.
</td>
<tr>
<td><b>Global Table</b>: input topic &rarr; <code>GlobalKTable</code></td>
<td>
Reads the specified Kafka input topic into a <code>GlobalKTable</code>. The topic is interpreted as a changelog stream, where records
with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or as DELETE (when the
value is <code>null</code>) for that key.
<p>
Slightly simplified, in the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be
populated with data from all the partitions of the input topic. In other words, when using a global table, every application
instance will get its own, full copy of the topic's data.
</p>
<p>
You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
When a name is not provided the table will not queryable and an internal name will be provided for the state store.
</p>
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
Serdes.String(), /* key serde */
Serdes.Long(), /* value serde */
"word-counts-input-topic", /* input topic */
"word-counts-global-store" /* table/store name */);
</pre>
When to provide serdes explicitly:
<ul>
<li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
<li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
match the configured default serdes.</li>
</ul>
Several variants of <code>globalTable<code> exist to e.g. specify explicit serdes.
</td>
</tbody>
</table>
<h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h4>
A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows: