mirror of https://github.com/apache/kafka.git
528 lines
23 KiB
HTML
528 lines
23 KiB
HTML
<!--
|
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
contributor license agreements. See the NOTICE file distributed with
|
|
this work for additional information regarding copyright ownership.
|
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
(the "License"); you may not use this file except in compliance with
|
|
the License. You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
-->
|
|
<script><!--#include virtual="../js/templateData.js" --></script>
|
|
|
|
<script id="content-template" type="text/x-handlebars-template">
|
|
<h1>Write your own Streams Applications</h1>
|
|
|
|
<p>
|
|
In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka's Streams API.
|
|
It is highly recommended to read the <a href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
|
|
</p>
|
|
|
|
<h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
|
|
|
|
<p>
|
|
We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
|
|
</p>
|
|
|
|
<pre class="brush: bash;">
|
|
mvn archetype:generate \
|
|
-DarchetypeGroupId=org.apache.kafka \
|
|
-DarchetypeArtifactId=streams-quickstart-java \
|
|
-DarchetypeVersion={{fullDotVersion}} \
|
|
-DgroupId=streams.examples \
|
|
-DartifactId=streams.examples \
|
|
-Dversion=0.1 \
|
|
-Dpackage=myapps
|
|
</pre>
|
|
|
|
<p>
|
|
You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
|
|
Assuming the above parameter values are used, this command will create a project structure that looks like this:
|
|
</p>
|
|
|
|
<pre class="brush: bash;">
|
|
> tree streams.examples
|
|
streams-quickstart
|
|
|-- pom.xml
|
|
|-- src
|
|
|-- main
|
|
|-- java
|
|
| |-- myapps
|
|
| |-- LineSplit.java
|
|
| |-- Pipe.java
|
|
| |-- WordCount.java
|
|
|-- resources
|
|
|-- log4j.properties
|
|
</pre>
|
|
|
|
<p>
|
|
<strong>Important:</strong> You must manually update the setting of <code><kafka.version></code> in the generated <code>pom.xml</code> file from <code>0.11.0.1-SNAPSHOT</code> to <code>0.11.0.1</code>.
|
|
<strong>Note:</strong> in the next release the above step will not be required.
|
|
There are already several example programs written with Streams library under <code>src/main/java</code>.
|
|
Since we are going to start writing such programs from scratch, we can now delete these examples:
|
|
</p>
|
|
|
|
<pre class="brush: bash;">
|
|
> cd streams-quickstart
|
|
> rm src/main/java/myapps/*.java
|
|
</pre>
|
|
|
|
<h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
|
|
|
|
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java</code>.
|
|
Let's name it <code>Pipe.java</code>:
|
|
|
|
<pre class="brush: java;">
|
|
package myapps;
|
|
|
|
public class Pipe {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<p>
|
|
We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
|
|
However if you are using a text editor you need to manually add the imports, and at the end of this section we'll show the complete code snippet with import statement for you.
|
|
</p>
|
|
|
|
<p>
|
|
The first step to write a Streams application is to create a <code>java.util.Properties</code> map to specify different Streams execution configuration values as defined in <code>StreamsConfig</code>.
|
|
A couple of important configuration values you need to set are: <code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster,
|
|
and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
Properties props = new Properties();
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
|
|
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
|
|
</pre>
|
|
|
|
<p>
|
|
In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
</pre>
|
|
|
|
<p>
|
|
For a full list of configurations of Kafka Streams please refer to this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
|
|
</p>
|
|
|
|
<p>
|
|
Next we will define the computational logic of our Streams application.
|
|
In Kafka Streams this computational logic is defined as a <code>topology</code> of connected processor nodes.
|
|
We can use a topology builder to construct such a topology,
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
final KStreamBuilder builder = new KStreamBuilder();
|
|
</pre>
|
|
|
|
<p>
|
|
And then create a source stream from a Kafka topic named <code>streams-plaintext-input</code> using this topology builder:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
</pre>
|
|
|
|
<p>
|
|
Now we get a <code>KStream</code> that is continuously generating records from its source Kafka topic <code>streams-plaintext-input</code>.
|
|
The records are organized as <code>String</code> typed key-value pairs.
|
|
The simplest thing we can do with this stream is to write it into another Kafka topic, say it's named <code>streams-pipe-output</code>:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
source.to("streams-pipe-output");
|
|
</pre>
|
|
|
|
<p>
|
|
Note that we can also concatenate the above two lines into a single line as:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
builder.stream("streams-plaintext-input").to("streams-pipe-output");
|
|
</pre>
|
|
|
|
<p>
|
|
we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology builder object
|
|
(one can also construct a <code>StreamsConfig</code> object from the <code>props</code> map and then pass that object to the constructor,
|
|
<code>KafkaStreams</code> have overloaded constructor functions to takes either type).
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
final KafkaStreams streams = new KafkaStreams(builder, props);
|
|
</pre>
|
|
|
|
<p>
|
|
By calling its <code>start()</code> function we can trigger the execution of this client.
|
|
The execution won't stop until <code>close()</code> is called on this client.
|
|
We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
// attach shutdown handler to catch control-c
|
|
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
|
@Override
|
|
public void run() {
|
|
streams.close();
|
|
latch.countDown();
|
|
}
|
|
});
|
|
|
|
try {
|
|
streams.start();
|
|
latch.await();
|
|
} catch (Throwable e) {
|
|
System.exit(1);
|
|
}
|
|
System.exit(0);
|
|
</pre>
|
|
|
|
<p>
|
|
The complete code so far looks like this:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
package myapps;
|
|
|
|
import org.apache.kafka.common.serialization.Serdes;
|
|
import org.apache.kafka.streams.KafkaStreams;
|
|
import org.apache.kafka.streams.StreamsConfig;
|
|
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
|
|
|
import java.util.Properties;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
public class Pipe {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
Properties props = new Properties();
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
|
|
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
|
|
final KStreamBuilder builder = new KStreamBuilder();
|
|
|
|
builder.stream("streams-plaintext-input").to("streams-pipe-output");
|
|
|
|
final KafkaStreams streams = new KafkaStreams(builder, props);
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
// attach shutdown handler to catch control-c
|
|
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
|
@Override
|
|
public void run() {
|
|
streams.close();
|
|
latch.countDown();
|
|
}
|
|
});
|
|
|
|
try {
|
|
streams.start();
|
|
latch.await();
|
|
} catch (Throwable e) {
|
|
System.exit(1);
|
|
}
|
|
System.exit(0);
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<p>
|
|
If you already have the Kafka broker up and running at <code>localhost:9092</code>,
|
|
and the topics <code>streams-plaintext-input</code> and <code>streams-pipe-output</code> created on that broker,
|
|
you can run this code in your IDE or on the command line, using Maven:
|
|
</p>
|
|
|
|
<pre class="brush: brush;">
|
|
> mvn clean package
|
|
> mvn exec:java -Dexec.mainClass=myapps.Pipe
|
|
</pre>
|
|
|
|
<p>
|
|
For detailed instructions on how to run a Streams application and observe its computing results,
|
|
please read the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
|
|
We will not talk about this in the rest of this section.
|
|
</p>
|
|
|
|
<h4><a id="tutorial_code_linesplit" href="#tutorial_code_linesplit">Writing a second Streams application: Line Split</a></h4>
|
|
|
|
<p>
|
|
We have learned how to construct a Streams client with its two key components: the <code>StreamsConfig</code> and <code>TopologyBuilder</code>.
|
|
Now let's move on to add some real processing logic by augmenting the current topology.
|
|
We can first create another program by first copy the existing <code>Pipe.java</code> class:
|
|
</p>
|
|
|
|
<pre class="brush: brush;">
|
|
> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
|
|
</pre>
|
|
|
|
<p>
|
|
And change its class name as well as the application id config to distinguish with the original program:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
public class Pipe {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
Properties props = new Properties();
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
|
|
// ...
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<p>
|
|
Since each of the source stream's record is a <code>String</code> typed key-value pair,
|
|
let's treat the value string as a text line and split it into words with a <code>FlatMapValues</code> operator:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
KStream<String, String> words = builder.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
|
@Override
|
|
public Iterable<String> apply(String value) {
|
|
return Arrays.asList(value.split("\\W+"));
|
|
}
|
|
});
|
|
</pre>
|
|
|
|
<p>
|
|
The operator will take the <code>source</code> stream as its input, and generate a new stream named <code>words</code>
|
|
by processing each record from its source stream in order and breaking its value string into a list of words, and producing
|
|
each word as a new record to the output <code>words</code> stream.
|
|
This is a stateless operator that does not need to keep track of any previously received records or processed results.
|
|
Note if you are using JDK 8 you can use lambda expression and simplify the above code as:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
|
|
</pre>
|
|
|
|
<p>
|
|
And finally we can write the word stream back into another Kafka topic, say <code>streams-linesplit-output</code>.
|
|
Again, these two steps can be concatenated as the following (assuming lambda expression is used):
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
|
|
.to("streams-linesplit-output");
|
|
</pre>
|
|
|
|
<p>
|
|
The complete code looks like this (assuming lambda expression is used):
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
package myapps;
|
|
|
|
import org.apache.kafka.common.serialization.Serdes;
|
|
import org.apache.kafka.streams.KafkaStreams;
|
|
import org.apache.kafka.streams.StreamsConfig;
|
|
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
|
import org.apache.kafka.streams.kstream.ValueMapper;
|
|
|
|
import java.util.Arrays;
|
|
import java.util.Properties;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
public class LineSplit {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
Properties props = new Properties();
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
|
|
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
|
|
final KStreamBuilder builder = new KStreamBuilder();
|
|
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
|
|
.to("streams-linesplit-output");
|
|
|
|
final KafkaStreams streams = new KafkaStreams(builder, props);
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
// ... same as Pipe.java below
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<h4><a id="tutorial_code_wordcount" href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
|
|
|
|
<p>
|
|
Let's now take a step further to add some "stateful" computations to the topology by counting the occurrence of the words split from the source text stream.
|
|
Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
public class WordCount {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
Properties props = new Properties();
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
|
|
// ...
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<p>
|
|
In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
|
@Override
|
|
public Iterable<String> apply(String value) {
|
|
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
|
|
}
|
|
});
|
|
</pre>
|
|
|
|
<p>
|
|
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
|
|
This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
KTable<String, Long> counts =
|
|
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
|
@Override
|
|
public Iterable<String> apply(String value) {
|
|
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
|
|
}
|
|
})
|
|
.groupBy(new KeyValueMapper<String, String, String>() {
|
|
@Override
|
|
public String apply(String key, String value) {
|
|
return value;
|
|
}
|
|
})
|
|
.count("Counts");
|
|
</pre>
|
|
|
|
<p>
|
|
Note that the <code>count</code> operator has a <code>String</code> typed parameter <code>Counts</code>,
|
|
which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic.
|
|
This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
|
|
</p>
|
|
|
|
<p>
|
|
We can also write the <code>counts</code> KTable's changelog stream back into another Kafka topic, say <code>streams-wordcount-output</code>.
|
|
Note that this time the value type is no longer <code>String</code> but <code>Long</code>, so the default serialization classes are not viable for writing it to Kafka anymore.
|
|
We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
|
</pre>
|
|
|
|
<p>
|
|
Note that in order to read the changelog stream from topic <code>streams-wordcount-output</code>,
|
|
one needs to set the value deserialization as <code>org.apache.kafka.common.serialization.LongDeserializer</code>.
|
|
Details of this can be found in the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
|
|
Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
|
.groupBy((key, value) -> value)
|
|
.count("Counts")
|
|
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
|
</pre>
|
|
|
|
<p>
|
|
The complete code looks like this (assuming lambda expression is used):
|
|
</p>
|
|
|
|
<pre class="brush: java;">
|
|
package myapps;
|
|
|
|
import org.apache.kafka.common.serialization.Serdes;
|
|
import org.apache.kafka.streams.KafkaStreams;
|
|
import org.apache.kafka.streams.StreamsConfig;
|
|
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
|
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
|
import org.apache.kafka.streams.kstream.ValueMapper;
|
|
|
|
import java.util.Arrays;
|
|
import java.util.Locale;
|
|
import java.util.Properties;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
public class WordCount {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
Properties props = new Properties();
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
|
|
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
|
|
|
final KStreamBuilder builder = new KStreamBuilder();
|
|
|
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
|
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
|
.groupBy((key, value) -> value)
|
|
.count("Counts")
|
|
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
|
|
|
final KafkaStreams streams = new KafkaStreams(builder, props);
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
// ... same as Pipe.java below
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<div class="pagination">
|
|
<a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
|
|
<a href="/{{version}}/documentation/streams/developer-guide" class="pagination__btn pagination__btn__next">Next</a>
|
|
</div>
|
|
</script>
|
|
|
|
<div class="p-quickstart-streams"></div>
|
|
|
|
<!--#include virtual="../../includes/_header.htm" -->
|
|
<!--#include virtual="../../includes/_top.htm" -->
|
|
<div class="content documentation documentation--current">
|
|
<!--#include virtual="../../includes/_nav.htm" -->
|
|
<div class="right">
|
|
<!--#include virtual="../../includes/_docs_banner.htm" -->
|
|
<ul class="breadcrumbs">
|
|
<li><a href="/documentation">Documentation</a></li>
|
|
<li><a href="/documentation/streams">Streams</a></li>
|
|
</ul>
|
|
<div class="p-content"></div>
|
|
</div>
|
|
</div>
|
|
<!--#include virtual="../../includes/_footer.htm" -->
|
|
<script>
|
|
$(function() {
|
|
// Show selected style on nav item
|
|
$('.b-nav__streams').addClass('selected');
|
|
|
|
// Display docs subnav items
|
|
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
|
|
});
|
|
</script>
|