mirror of https://github.com/apache/kafka.git
MINOR: Make streams quick start more interactive
1. Make the WordCountDemo application to not stop automatically but via "ctrl-C". 2. Update the quickstart html file to let users type input messages one-by-one, and observe added output in an interactive manner. 3. Some minor fixes on the parent documentation page pointing to streams sub-pages, added a new recommended Scala version number. Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Michael G. Noll <michael@confluent.io>, Damian Guy <damian.guy@gmail.com> Closes #3515 from guozhangwang/KMinor-interactive-quickstart
This commit is contained in:
parent
5d798511b1
commit
91c207c2c6
|
@ -20,4 +20,5 @@ var context={
|
|||
"version": "0110",
|
||||
"dotVersion": "0.11.0",
|
||||
"fullDotVersion": "0.11.0.0"
|
||||
"scalaVersion:" "2.11"
|
||||
};
|
||||
|
|
|
@ -40,10 +40,10 @@ of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream
|
|||
final Serde<String> stringSerde = Serdes.String();
|
||||
final Serde<Long> longSerde = Serdes.Long();
|
||||
|
||||
// Construct a `KStream` from the input topic ""streams-file-input", where message values
|
||||
// Construct a `KStream` from the input topic "streams-wordcount-input", where message values
|
||||
// represent lines of text (for the sake of this example, we ignore whatever may be stored
|
||||
// in the message keys).
|
||||
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
|
||||
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-wordcount-input");
|
||||
|
||||
KTable<String, Long> wordCounts = textLines
|
||||
// Split each text line, by whitespace, into words.
|
||||
|
@ -71,16 +71,18 @@ because it cannot know when it has processed "all" the input data.
|
|||
<p>
|
||||
As the first step, we will start Kafka (unless you already have it started) and then we will
|
||||
prepare input data to a Kafka topic, which will subsequently be processed by a Kafka Streams application.
|
||||
</p>
|
||||
|
||||
<h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step 1: Download the code</a></h4>
|
||||
|
||||
<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_2.11-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
|
||||
<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
|
||||
Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> tar -xzf kafka_2.11-{{fullDotVersion}}.tgz
|
||||
> cd kafka_2.11-{{fullDotVersion}}
|
||||
> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
|
||||
> cd kafka_{{scalaVersion}}-{{fullDotVersion}}
|
||||
</pre>
|
||||
</p>
|
||||
|
||||
<h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
|
||||
|
||||
<p>
|
||||
|
@ -102,19 +104,9 @@ Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a> so you need to
|
|||
</pre>
|
||||
|
||||
|
||||
<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare data</a></h4>
|
||||
<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4>
|
||||
|
||||
<!--
|
||||
<pre>
|
||||
> <b>./bin/kafka-topics --create \</b>
|
||||
<b>--zookeeper localhost:2181 \</b>
|
||||
<b>--replication-factor 1 \</b>
|
||||
<b>--partitions 1 \</b>
|
||||
<b>--topic streams-file-input</b>
|
||||
|
||||
</pre>
|
||||
|
||||
-->
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
|
||||
|
@ -126,41 +118,59 @@ Or on Windows:
|
|||
> echo|set /p=join kafka summit>> file-input.txt
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Next, we send this input data to the input topic named <b>streams-file-input</b> using the console producer,
|
||||
which reads the data from STDIN line-by-line, and publishes each line as a separate Kafka message with null key and value encoded a string to the topic (in practice,
|
||||
stream data will likely be flowing continuously into Kafka where the application will be up and running):
|
||||
</p>
|
||||
-->
|
||||
|
||||
Next, we create the input topic named <b>streams-wordcount-input</b> and the output topic named <b>streams-wordcount-output</b>:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-topics.sh --create \
|
||||
--zookeeper localhost:2181 \
|
||||
--replication-factor 1 \
|
||||
--partitions 1 \
|
||||
--topic streams-file-input
|
||||
--topic streams-wordcount-input
|
||||
Created topic "streams-wordcount-input".
|
||||
|
||||
> bin/kafka-topics.sh --create \
|
||||
--zookeeper localhost:2181 \
|
||||
--replication-factor 1 \
|
||||
--partitions 1 \
|
||||
--topic streams-wordcount-output
|
||||
Created topic "streams-wordcount-output".
|
||||
</pre>
|
||||
|
||||
The created topic can be described with the same <b>kafka-topics</b> tool:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
|
||||
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
|
||||
|
||||
Topic:streams-wordcount-input PartitionCount:1 ReplicationFactor:1 Configs:
|
||||
Topic: streams-wordcount-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:
|
||||
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
</pre>
|
||||
|
||||
<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 4: Process data</a></h4>
|
||||
<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: Start the Wordcount Application</a></h4>
|
||||
|
||||
The following command starts the WordCount demo application:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
The demo application will read from the input topic <b>streams-file-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
|
||||
The demo application will read from the input topic <b>streams-wordcount-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
|
||||
and continuously write its current results to the output topic <b>streams-wordcount-output</b>.
|
||||
Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
|
||||
The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.
|
||||
</p>
|
||||
<p>
|
||||
We can now inspect the output of the WordCount demo application by reading from its output topic:
|
||||
</p>
|
||||
|
||||
Now we can start the console producer in a separate terminal to write some input data to this topic:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
</pre>
|
||||
|
||||
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
--topic streams-wordcount-output \
|
||||
|
@ -172,26 +182,114 @@ We can now inspect the output of the WordCount demo application by reading from
|
|||
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
|
||||
</pre>
|
||||
|
||||
|
||||
<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5: Process some data</a></h4>
|
||||
|
||||
Now let's write some message with the console producer into the input topic <b>streams-wordcount-input</b> by entering a single line of text and then hit <RETURN>.
|
||||
This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
|
||||
(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
all streams lead to kafka
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
with the following output data being printed to the console:
|
||||
This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--formatter kafka.tools.DefaultMessageFormatter \
|
||||
--property print.key=true \
|
||||
--property print.value=true \
|
||||
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
|
||||
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
|
||||
|
||||
all 1
|
||||
streams 1
|
||||
lead 1
|
||||
to 1
|
||||
kafka 1
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count.
|
||||
</p>
|
||||
|
||||
Now let's continue writing one more message with the console producer into the input topic <b>streams-wordcount-input</b>.
|
||||
Enter the text line "hello kafka streams" and hit <RETURN>.
|
||||
Your terminal should look as follows:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
all streams lead to kafka
|
||||
hello kafka streams
|
||||
</pre>
|
||||
|
||||
In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--formatter kafka.tools.DefaultMessageFormatter \
|
||||
--property print.key=true \
|
||||
--property print.value=true \
|
||||
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
|
||||
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
|
||||
|
||||
all 1
|
||||
streams 1
|
||||
lead 1
|
||||
to 1
|
||||
kafka 1
|
||||
hello 1
|
||||
kafka 2
|
||||
streams 2
|
||||
</pre>
|
||||
|
||||
Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been incremented from <b>1</b> to <b>2</b>.
|
||||
Whenever you write further input messages to the input topic, you will observe new messages being added to the <b>streams-wordcount-output</b> topic,
|
||||
representing the most recent word counts as computed by the WordCount application.
|
||||
Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic <b>streams-wordcount-input</b> before we wrap up this quickstart:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
all streams lead to kafka
|
||||
hello kafka streams
|
||||
join kafka summit
|
||||
</pre>
|
||||
|
||||
The <b>streams-wordcount-output</b> topic will subsequently show the corresponding updated word counts (see last three lines):
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--formatter kafka.tools.DefaultMessageFormatter \
|
||||
--property print.key=true \
|
||||
--property print.value=true \
|
||||
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
|
||||
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
|
||||
|
||||
all 1
|
||||
streams 1
|
||||
lead 1
|
||||
to 1
|
||||
kafka 1
|
||||
hello 1
|
||||
kafka 2
|
||||
streams 2
|
||||
join 1
|
||||
kafka 3
|
||||
summit 1
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Here, the first column is the Kafka message key in <code>java.lang.String</code> format, and the second column is the message value in <code>java.lang.Long</code> format.
|
||||
Note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is
|
||||
As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is
|
||||
an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The two diagrams below illustrate what is essentially happening behind the scenes.
|
||||
|
@ -217,13 +315,9 @@ And so on (we skip the illustration of how the third line is being processed). T
|
|||
Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Now you can write more input messages to the <b>streams-file-input</b> topic and observe additional messages added
|
||||
to <b>streams-wordcount-output</b> topic, reflecting updated word counts (e.g., using the console producer and the
|
||||
console consumer, as described above).
|
||||
</p>
|
||||
<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown the application</a></h4>
|
||||
|
||||
<p>You can stop the console consumer via <b>Ctrl-C</b>.</p>
|
||||
<p>You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the Zookeeper server in order via <b>Ctrl-C</b>.</p>
|
||||
|
||||
<div class="pagination">
|
||||
<a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>
|
||||
|
|
|
@ -141,6 +141,15 @@
|
|||
<li><a href="#connect_development">8.3 Connector Development Guide</a></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a>
|
||||
<ul>
|
||||
<li><a href="/{{version}}/documentation/streams/quickstart">9.1 Play with a Streams Application</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/developer-guide">9.2 Developer Guide</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/core-concepts">9.3 Core Concepts</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/architecture">9.4 Architecture</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/upgrade-guide">9.5 Upgrade Guide and API Changes</a></li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
|
||||
</script>
|
||||
|
|
|
@ -18,11 +18,13 @@ package org.apache.kafka.streams.examples.pipe;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
|
||||
|
@ -51,13 +53,24 @@ public class PipeDemo {
|
|||
|
||||
builder.stream("streams-file-input").to("streams-pipe-output");
|
||||
|
||||
KafkaStreams streams = new KafkaStreams(builder, props);
|
||||
streams.start();
|
||||
|
||||
// usually the stream application would be running forever,
|
||||
// in this example we just let it run for some time and stop since the input data is finite.
|
||||
Thread.sleep(5000L);
|
||||
final KafkaStreams streams = new KafkaStreams(builder, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
@ -30,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
|
|||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
|
||||
|
@ -60,7 +62,7 @@ public class WordCountDemo {
|
|||
|
||||
KStreamBuilder builder = new KStreamBuilder();
|
||||
|
||||
KStream<String, String> source = builder.stream("streams-file-input");
|
||||
KStream<String, String> source = builder.stream("streams-wordcount-input");
|
||||
|
||||
KTable<String, Long> counts = source
|
||||
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
|
@ -80,13 +82,24 @@ public class WordCountDemo {
|
|||
// need to override value serde to Long type
|
||||
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
||||
|
||||
KafkaStreams streams = new KafkaStreams(builder, props);
|
||||
streams.start();
|
||||
|
||||
// usually the stream application would be running forever,
|
||||
// in this example we just let it run for some time and stop since the input data is finite.
|
||||
Thread.sleep(5000L);
|
||||
final KafkaStreams streams = new KafkaStreams(builder, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
|
@ -33,6 +34,7 @@ import org.apache.kafka.streams.state.Stores;
|
|||
|
||||
import java.util.Locale;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
|
||||
|
@ -119,20 +121,31 @@ public class WordCountProcessorDemo {
|
|||
|
||||
TopologyBuilder builder = new TopologyBuilder();
|
||||
|
||||
builder.addSource("Source", "streams-file-input");
|
||||
builder.addSource("Source", "streams-wordcount-input");
|
||||
|
||||
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
|
||||
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
|
||||
|
||||
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
|
||||
|
||||
KafkaStreams streams = new KafkaStreams(builder, props);
|
||||
streams.start();
|
||||
|
||||
// usually the stream application would be running forever,
|
||||
// in this example we just let it run for some time and stop since the input data is finite.
|
||||
Thread.sleep(5000L);
|
||||
final KafkaStreams streams = new KafkaStreams(builder, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue