From ecea150bdeb7cb98f1aa32eb2cf8ac4116e28f74 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Tue, 22 Aug 2017 14:14:35 +0100 Subject: [PATCH] MINOR: Document how to create source streams and tables Originally reviewed as part of https://github.com/apache/kafka/pull/3490. Author: Eno Thereska Reviewers: Damian Guy Closes #3701 from enothereska/minor-docs-create-source-streams --- docs/streams/developer-guide.html | 125 +++++++++++++++++++++++++++--- 1 file changed, 115 insertions(+), 10 deletions(-) diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index bcbce24ca15..e3acf538527 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -450,21 +450,126 @@ Note that in the WordCountProcessor implementation, users need to r If these records a KStream and the stream processing application were to sum the values it would return 4. If these records were a KTable or GlobalKTable, the return would be 3, since the last record would be considered as an update. -

Create Source Streams from Kafka

+

Creating Source Streams from Kafka

- Either a record stream (defined as KStream) or a changelog stream (defined as KTable or GlobalKTable) - can be created as a source stream from one or more Kafka topics (for KTable and GlobalKTable you can only create the source stream - from a single topic). + You can easily read data from Kafka topics into your application. We support the following operations.

+ + + + + + + + + + + + + + + + +
Reading from KafkaDescription
Stream: input topic(s) → KStreamCreate a KStream from the specified Kafka input topic(s), interpreting the data as a record stream. + A KStream represents a partitioned record stream. +

+ Slightly simplified, in the case of a KStream, the local KStream instance of every application instance will be populated + with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, + all the partitions of the input topic will be read and processed. +

+
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.KStream;
 
-    
-    StreamsBuilder builder = new StreamsBuilder();
+                    StreamsBuilder builder = new StreamsBuilder();
 
-    KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
-    KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
-    GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4", "globalStoreName");
-    
+ KStream<String, Long> wordCounts = builder.stream( + Serdes.String(), /* key serde */ + Serdes.Long(), /* value serde */ + "word-counts-input-topic" /* input topic */); +
+ When to provide serdes explicitly: +
    +
  • If you do not specify serdes explicitly, the default serdes from the configuration are used.
  • +
  • You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic(s) do not match + the configured default serdes.
  • +
+ Several variants of stream exist to e.g. specify a regex pattern for input topics to read from.
Table: input topic(s) → KTable + Reads the specified Kafka input topic into a KTable. The topic is interpreted as a changelog stream, + where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not null) or + as DELETE (when the value is null) for that key. +

+ Slightly simplified, in the case of a KTable, the local KTable instance of every application instance will be populated + with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, all + the partitions of the input topic will be read and processed. +

+

+ You may provide an optional name for the table (more precisely, for the internal state store that backs the table). + When a name is provided, the table can be queryied using interactive queries. + When a name is not provided the table will not queryable and an internal name will be provided for the state store. +

+
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.KTable;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    KTable<String, Long> wordCounts = builder.table(
+                        Serdes.String(), /* key serde */
+                        Serdes.Long(),   /* value serde */
+                        "word-counts-input-topic", /* input topic */
+                        "word-counts-partitioned-store" /* table/store name */);
+                
+ + When to provide serdes explicitly: +
    +
  • If you do not specify serdes explicitly, the default serdes from the configuration are used.
  • +
  • You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not + match the configured default serdes.
  • +
+ + Several variants of table exist to e.g. specify the auto.offset.reset + policy to be used when reading from the input topic. +
Global Table: input topic → GlobalKTable + Reads the specified Kafka input topic into a GlobalKTable. The topic is interpreted as a changelog stream, where records + with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not null) or as DELETE (when the + value is null) for that key. +

+ Slightly simplified, in the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be + populated with data from all the partitions of the input topic. In other words, when using a global table, every application + instance will get its own, full copy of the topic's data. +

+

+ You may provide an optional name for the table (more precisely, for the internal state store that backs the table). + When a name is provided, the table can be queryied using interactive queries. + When a name is not provided the table will not queryable and an internal name will be provided for the state store. +

+
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.GlobalKTable;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    GlobalKTable<String, Long> wordCounts = builder.globalTable(
+                        Serdes.String(), /* key serde */
+                        Serdes.Long(),   /* value serde */
+                        "word-counts-input-topic", /* input topic */
+                        "word-counts-global-store" /* table/store name */);
+                
+ + When to provide serdes explicitly: +
    +
  • If you do not specify serdes explicitly, the default serdes from the configuration are used.
  • +
  • You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not + match the configured default serdes.
  • +
+ Several variants of globalTable exist to e.g. specify explicit serdes. + +

Windowing a stream

A stream processor may need to divide data records into time buckets, i.e. to window the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows: