mirror of https://github.com/apache/kafka.git
KAFKA-5727: Add Streams quickstart tutorial as an archetype project
0. Minor fixes on the existing examples to merge all on a single input topic; also do not use `common.utils.Exit` as it is for internal usage only. 1. Add the archetype project for the quickstart. Steps to try it out: a. `mvn install` on the quickstart directory. b. `mvn archetype:generate \ -DarchetypeGroupId=org.apache.kafka \ -DarchetypeArtifactId=streams-quickstart-java \ -DarchetypeVersion=1.0.0-SNAPSHOT \ -DgroupId=streams-quickstart \ -DartifactId=streams-quickstart \ -Dversion=0.1 \ -Dpackage=StreamsQuickstart \ -DinteractiveMode=false` at any directory to create the project. c. build the streams jar with version `1.0.0-SNAPSHOT` to local maven repository with `./gradlew installAll`; `cd streams-quickstart; mvn clean package` d. create the input / output topics, start the console producer and consumer. e. start the program: `mvn exec:java -Dexec.mainClass=StreamsQuickstart.Pipe/LineSplit/WordCount`. f. type data on console producer and observe data on console consumer. Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Ewen Cheslack-Postava <me@ewencp.org>, Eno Thereska <eno.thereska@gmail.com> Closes #3630 from guozhangwang/KMinor-streams-quickstart-tutorial
This commit is contained in:
parent
3e69ce8015
commit
d2774e302f
30
README.md
30
README.md
|
@ -127,6 +127,36 @@ Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradl
|
|||
signing.password=
|
||||
signing.secretKeyRingFile=
|
||||
|
||||
### Publishing the streams quickstart archetype artifact to maven ###
|
||||
For the Streams archetype project, one cannot use gradle to upload to maven; instead the `mvn deploy` command needs to be called at the quickstart folder:
|
||||
|
||||
cd streams/quickstart
|
||||
mvn deploy
|
||||
|
||||
Please note for this to work you should create/update user maven settings (typically, `${USER_HOME}/.m2/settings.xml`) to assign the following variables
|
||||
|
||||
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
|
||||
https://maven.apache.org/xsd/settings-1.0.0.xsd">
|
||||
...
|
||||
<servers>
|
||||
...
|
||||
<server>
|
||||
<id>apache.snapshots.https</id>
|
||||
<username>${maven_username}</username>
|
||||
<password>${maven_password}</password>
|
||||
</server>
|
||||
<server>
|
||||
<id>apache.releases.https</id>
|
||||
<username>${maven_username}</username>
|
||||
<password>${maven_password}</password>
|
||||
</server>
|
||||
...
|
||||
</servers>
|
||||
...
|
||||
|
||||
|
||||
### Installing the jars to the local Maven repository ###
|
||||
./gradlew installAll
|
||||
|
||||
|
|
|
@ -121,7 +121,8 @@ if (new File('.git').exists()) {
|
|||
'**/README.md',
|
||||
'**/id_rsa',
|
||||
'**/id_rsa.pub',
|
||||
'checkstyle/suppressions.xml'
|
||||
'checkstyle/suppressions.xml',
|
||||
'streams/quickstart/java/src/test/resources/projects/basic/goal.txt'
|
||||
])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- should always link the the latest release's documentation -->
|
||||
<!--#include virtual="../../streams/tutorial.html" -->
|
|
@ -17,8 +17,8 @@ limitations under the License.
|
|||
|
||||
// Define variables for doc templates
|
||||
var context={
|
||||
"version": "0110",
|
||||
"dotVersion": "0.11.0",
|
||||
"fullDotVersion": "0.11.0.0"
|
||||
"version": "100",
|
||||
"dotVersion": "1.0",
|
||||
"fullDotVersion": "1.0.0"
|
||||
"scalaVersion:" "2.11"
|
||||
};
|
||||
|
|
|
@ -40,10 +40,10 @@ of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream
|
|||
final Serde<String> stringSerde = Serdes.String();
|
||||
final Serde<Long> longSerde = Serdes.Long();
|
||||
|
||||
// Construct a `KStream` from the input topic "streams-wordcount-input", where message values
|
||||
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
|
||||
// represent lines of text (for the sake of this example, we ignore whatever may be stored
|
||||
// in the message keys).
|
||||
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-wordcount-input");
|
||||
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input");
|
||||
|
||||
KTable<String, Long> wordCounts = textLines
|
||||
// Split each text line, by whitespace, into words.
|
||||
|
@ -120,15 +120,15 @@ Or on Windows:
|
|||
|
||||
-->
|
||||
|
||||
Next, we create the input topic named <b>streams-wordcount-input</b> and the output topic named <b>streams-wordcount-output</b>:
|
||||
Next, we create the input topic named <b>streams-plaintext-input</b> and the output topic named <b>streams-wordcount-output</b>:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-topics.sh --create \
|
||||
--zookeeper localhost:2181 \
|
||||
--replication-factor 1 \
|
||||
--partitions 1 \
|
||||
--topic streams-wordcount-input
|
||||
Created topic "streams-wordcount-input".
|
||||
--topic streams-plaintext-input
|
||||
Created topic "streams-plaintext-input".
|
||||
|
||||
> bin/kafka-topics.sh --create \
|
||||
--zookeeper localhost:2181 \
|
||||
|
@ -143,8 +143,8 @@ The created topic can be described with the same <b>kafka-topics</b> tool:
|
|||
<pre class="brush: bash;">
|
||||
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
|
||||
|
||||
Topic:streams-wordcount-input PartitionCount:1 ReplicationFactor:1 Configs:
|
||||
Topic: streams-wordcount-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:
|
||||
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:
|
||||
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
</pre>
|
||||
|
@ -158,7 +158,7 @@ The following command starts the WordCount demo application:
|
|||
</pre>
|
||||
|
||||
<p>
|
||||
The demo application will read from the input topic <b>streams-wordcount-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
|
||||
The demo application will read from the input topic <b>streams-plaintext-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
|
||||
and continuously write its current results to the output topic <b>streams-wordcount-output</b>.
|
||||
Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
|
||||
</p>
|
||||
|
@ -166,7 +166,7 @@ Hence there won't be any STDOUT output except log entries as the results are wri
|
|||
Now we can start the console producer in a separate terminal to write some input data to this topic:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
|
||||
</pre>
|
||||
|
||||
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
|
||||
|
@ -185,12 +185,12 @@ and inspect the output of the WordCount demo application by reading from its out
|
|||
|
||||
<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5: Process some data</a></h4>
|
||||
|
||||
Now let's write some message with the console producer into the input topic <b>streams-wordcount-input</b> by entering a single line of text and then hit <RETURN>.
|
||||
Now let's write some message with the console producer into the input topic <b>streams-plaintext-input</b> by entering a single line of text and then hit <RETURN>.
|
||||
This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
|
||||
(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
|
||||
all streams lead to kafka
|
||||
</pre>
|
||||
|
||||
|
@ -219,12 +219,12 @@ kafka 1
|
|||
Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count.
|
||||
</p>
|
||||
|
||||
Now let's continue writing one more message with the console producer into the input topic <b>streams-wordcount-input</b>.
|
||||
Now let's continue writing one more message with the console producer into the input topic <b>streams-plaintext-input</b>.
|
||||
Enter the text line "hello kafka streams" and hit <RETURN>.
|
||||
Your terminal should look as follows:
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
|
||||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
|
||||
all streams lead to kafka
|
||||
hello kafka streams
|
||||
</pre>
|
||||
|
@ -321,7 +321,7 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing h
|
|||
|
||||
<div class="pagination">
|
||||
<a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>
|
||||
<a href="/{{version}}/documentation/streams/developer-guide" class="pagination__btn pagination__btn__next">Next</a>
|
||||
<a href="/{{version}}/documentation/streams/tutorial" class="pagination__btn pagination__btn__next">Next</a>
|
||||
</div>
|
||||
</script>
|
||||
|
||||
|
|
|
@ -0,0 +1,630 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<script><!--#include virtual="../js/templateData.js" --></script>
|
||||
|
||||
<script id="content-template" type="text/x-handlebars-template">
|
||||
<h1>Write your own Streams Applications</h1>
|
||||
|
||||
<p>
|
||||
In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka Streams.
|
||||
It is highly recommended to read the <a href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
|
||||
</p>
|
||||
|
||||
<h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
|
||||
|
||||
<p>
|
||||
We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
mvn archetype:generate \
|
||||
-DarchetypeGroupId=org.apache.kafka \
|
||||
-DarchetypeArtifactId=streams-quickstart-java \
|
||||
-DarchetypeVersion={{fullDotVersion}} \
|
||||
-DgroupId=streams.examples \
|
||||
-DartifactId=streams.examples \
|
||||
-Dversion=0.1 \
|
||||
-Dpackage=myapps
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
|
||||
Assuming the above parameter values are used, this command will create a project structure that looks like this:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> tree streams.examples
|
||||
streams-quickstart
|
||||
├── pom.xml
|
||||
└── src
|
||||
└── main
|
||||
├── java
|
||||
│ └── myapps
|
||||
│ ├── LineSplit.java
|
||||
│ ├── Pipe.java
|
||||
│ └── WordCount.java
|
||||
└── resources
|
||||
└── log4j.properties
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
The <code>pom.xml</code> file included in the project already has the Streams dependency defined,
|
||||
and there are already several example programs written with Streams library under <code>src/main/java</code>.
|
||||
Since we are going to start writing such programs from scratch, we can now delete these examples:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> cd streams-quickstart
|
||||
> rm src/main/java/myapps/*.java
|
||||
</pre>
|
||||
|
||||
<h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
|
||||
|
||||
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java</code>.
|
||||
Let's name it <code>Pipe.java</code>:
|
||||
|
||||
<pre class="brush: java;">
|
||||
package myapps;
|
||||
|
||||
public class Pipe {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
|
||||
However if you are using a text editor you need to manually add the imports, and at the end of this section we'll show the complete code snippet with import statement for you.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The first step to write a Streams application is to create a <code>java.util.Properties</code> map to specify different Streams execution configuration values as defined in <code>StreamsConfig</code>.
|
||||
A couple of important configuration values you need to set are: <code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster,
|
||||
and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
For a full list of configurations of Kafka Streams please refer to this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Next we will define the computational logic of our Streams application.
|
||||
In Kafka Streams this computational logic is defined as a <code>topology</code> of connected processor nodes.
|
||||
We can use a topology builder to construct such a topology,
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
And then create a source stream from a Kafka topic named <code>streams-plaintext-input</code> using this topology builder:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Now we get a <code>KStream</code> that is continuously generating records from its source Kafka topic <code>streams-plaintext-input</code>.
|
||||
The records are organized as <code>String</code> typed key-value pairs.
|
||||
The simplest thing we can do with this stream is to write it into another Kafka topic, say it's named <code>streams-pipe-output</code>:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
source.to("streams-pipe-output");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Note that we can also concatenate the above two lines into a single line as:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
builder.stream("streams-plaintext-input").to("streams-pipe-output");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
We can inspect what kind of <code>topology</code> is created from this builder by doing the following:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
final Topology topology = builder.build();
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
And print its description to standard output as:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
System.out.println(topology.describe());
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
If we just stop here, compile and run the program, it will output the following information:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.Pipe
|
||||
Sub-topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
|
||||
Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
|
||||
Global Stores:
|
||||
none
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
As shown above, it illustrate that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
|
||||
<code>KSTREAM-SOURCE-0000000000</code> continuously read records from Kafka topic <code>streams-plaintext-input</code> and pipe them to its downstream node <code>KSTREAM-SINK-0000000001</code>;
|
||||
<code>KSTREAM-SINK-0000000001</code> will write each of its received record in order to another Kafka topic <code>streams-pipe-output</code>
|
||||
(the <code>--></code> and <code><--</code> arrows dictates the downstream and upstream processor nodes of this node, i.e. "children" and "parents" within the topology graph).
|
||||
It also illustrates that this simple topology has no global state stores associated with it (we will talk about state stores more in the following sections).
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Note that we can always describe the topology as we did above at any given point while we are building it in the code, so as a user you can interactively "try and taste" your computational logic defined in the topology until you are happy with it.
|
||||
Suppose we are already done with this simple topology that just pipes data from one Kafka topic to another in an endless streaming manner,
|
||||
we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology object
|
||||
(one can also construct a <code>StreamsConfig</code> object from the <code>props</code> map and then pass that object to the constructor,
|
||||
<code>KafkaStreams</code> have overloaded constructor functions to takes either type).
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
By calling its <code>start()</code> function we can trigger the execution of this client.
|
||||
The execution won't stop until <code>close()</code> is called on this client.
|
||||
We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
System.exit(1);
|
||||
}
|
||||
System.exit(0);
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
The complete code so far looks like this:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
package myapps;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Pipe {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
builder.stream("streams-plaintext-input").to("streams-pipe-output");
|
||||
|
||||
final Topology topology = builder.build();
|
||||
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
System.exit(1);
|
||||
}
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
If you already have the Kafka broker up and running at <code>localhost:9092</code>,
|
||||
and the topics <code>streams-plaintext-input</code> and <code>streams-pipe-output</code> created on that broker,
|
||||
you can run this code in your IDE or on the command line, using Maven:
|
||||
</p>
|
||||
|
||||
<pre class="brush: brush;">
|
||||
> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.Pipe
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
For detailed instructions on how to run a Streams application and observe its computing results,
|
||||
please read the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
|
||||
We will not talk about this in the rest of this section.
|
||||
</p>
|
||||
|
||||
<h4><a id="tutorial_code_linesplit" href="#tutorial_code_linesplit">Writing a second Streams application: Line Split</a></h4>
|
||||
|
||||
<p>
|
||||
We have learned how to construct a Streams client with its two key components: the <code>StreamsConfig</code> and <code>Topology</code>.
|
||||
Now let's move on to add some real processing logic by augmenting the current topology.
|
||||
We can first create another program by first copy the existing <code>Pipe.java</code> class:
|
||||
</p>
|
||||
|
||||
<pre class="brush: brush;">
|
||||
> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
And change its class name as well as the application id config to distinguish with the original program:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
public class Pipe {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
|
||||
// ...
|
||||
}
|
||||
}
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Since each of the source stream's record is a <code>String</code> typed key-value pair,
|
||||
let's treat the value string as a text line and split it into words with a <code>FlatMapValues</code> operator:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
KStream<String, String> words = builder.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
@Override
|
||||
public Iterable<String> apply(String value) {
|
||||
return Arrays.asList(value.split("\\W+"));
|
||||
}
|
||||
});
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
The operator will take the <code>source</code> stream as its input, and generate a new stream named <code>words</code>
|
||||
by processing each record from its source stream in order and breaking its value string into a list of words, and producing
|
||||
each word as a new record to the output <code>words</code> stream.
|
||||
This is a stateless operator that does not need to keep track of any previously received records or processed results.
|
||||
Note if you are using JDK 8 you can use lambda expression and simplify the above code as:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
And finally we can write the word stream back into another Kafka topic, say <code>streams-linesplit-output</code>.
|
||||
Again, these two steps can be concatenated as the following (assuming lambda expression is used):
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
|
||||
.to("streams-linesplit-output");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
If we now describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.LineSplit
|
||||
Sub-topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
|
||||
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
|
||||
Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
|
||||
Global Stores:
|
||||
none
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
|
||||
It takes the source node as its parent and the sink node as its child.
|
||||
In other words, each record fetched by the source node will first traverse to the newly added <code>KSTREAM-FLATMAPVALUES-0000000001</code> node to be processed,
|
||||
and one or more new records will be generated as a result. They will continue traverse down to the sink node to be written back to Kafka.
|
||||
Note this processor node is "stateless" as it is not associated with any stores (i.e. <code>(stores: [])</code>).
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The complete code looks like this (assuming lambda expression is used):
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
package myapps;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class LineSplit {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
|
||||
.to("streams-linesplit-output");
|
||||
|
||||
final Topology topology = builder.build();
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// ... same as Pipe.java below
|
||||
}
|
||||
}
|
||||
</pre>
|
||||
|
||||
<h4><a id="tutorial_code_wordcount" href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
|
||||
|
||||
<p>
|
||||
Let's now take a step further to add some "stateful" computations to the topology by counting the occurrence of the words split from the source text stream.
|
||||
Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
public class WordCount {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
|
||||
// ...
|
||||
}
|
||||
}
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
@Override
|
||||
public Iterable<String> apply(String value) {
|
||||
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
|
||||
}
|
||||
});
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
|
||||
This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
KTable<String, Long> counts =
|
||||
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
@Override
|
||||
public Iterable<String> apply(String value) {
|
||||
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
|
||||
}
|
||||
})
|
||||
.groupBy(new KeyValueMapper<String, String, String>() {
|
||||
@Override
|
||||
public String apply(String key, String value) {
|
||||
return value;
|
||||
}
|
||||
})
|
||||
.count("Counts");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Note that the <code>count</code> operator has a <code>String</code> typed parameter <code>Counts</code>,
|
||||
which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic.
|
||||
This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
We can also write the <code>counts</code> KTable's changelog stream back into another Kafka topic, say <code>streams-wordcount-output</code>.
|
||||
Note that this time the value type is no longer <code>String</code> but <code>Long</code>, so the default serialization classes are not viable for writing it to Kafka anymore.
|
||||
We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Note that in order to read the changelog stream from topic <code>streams-wordcount-output</code>,
|
||||
one needs to set the value deserialization as <code>org.apache.kafka.common.serialization.LongDeserializer</code>.
|
||||
Details of this can be found in the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
|
||||
Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||
.groupBy((key, value) -> value)
|
||||
.count("Counts")
|
||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
If we again describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.WordCount
|
||||
Sub-topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
|
||||
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
|
||||
Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
|
||||
Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
|
||||
Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
|
||||
Sub-topology: 1
|
||||
Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
|
||||
Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
|
||||
Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
|
||||
Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
|
||||
Global Stores:
|
||||
none
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
As we can see above, the topology now contains two disconnected sub-topologies.
|
||||
The first sub-topology's sink node <code>KSTREAM-SINK-0000000004</code> will write to a repartition topic <code>Counts-repartition</code>,
|
||||
which will be read by the second sub-topology's source node <code>KSTREAM-SOURCE-0000000006</code>.
|
||||
The repartition topic is used to "shuffle" the source stream by its aggregation key, which is in this case the value string.
|
||||
In addition, inside the first sub-topology a stateless <code>KSTREAM-FILTER-0000000005</code> node is injected between the grouping <code>KSTREAM-KEY-SELECT-0000000002</code> node and the sink node to filter out any intermediate record whose aggregate key is empty.
|
||||
</p>
|
||||
<p>
|
||||
In the second sub-topology, the aggregation node <code>KSTREAM-AGGREGATE-0000000003</code> is associated with a state store named <code>Counts</code> (the name is specified by the user in the <code>count</code> operator).
|
||||
Upon receiving each record from its upcoming stream source node, the aggregation processor will first query its associated <code>Counts</code> store to get the current count for that key, augment by one, and then write the new count back to the store.
|
||||
Each updated count for the key will also be piped downstream to the <code>KTABLE-TOSTREAM-0000000007</code> node, which interpret this update stream as a record stream before further piping to the sink node <code>KSTREAM-SINK-0000000008</code> for writing back to Kafka.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The complete code looks like this (assuming lambda expression is used):
|
||||
</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
package myapps;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class WordCount {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||
.groupBy((key, value) -> value)
|
||||
.count("Counts")
|
||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
||||
|
||||
final Topology topology = builder.build();
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// ... same as Pipe.java below
|
||||
}
|
||||
}
|
||||
</pre>
|
||||
|
||||
<div class="pagination">
|
||||
<a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
|
||||
<a href="/{{version}}/documentation/streams/developer-guide" class="pagination__btn pagination__btn__next">Next</a>
|
||||
</div>
|
||||
</script>
|
||||
|
||||
<div class="p-quickstart-streams"></div>
|
||||
|
||||
<!--#include virtual="../../includes/_header.htm" -->
|
||||
<!--#include virtual="../../includes/_top.htm" -->
|
||||
<div class="content documentation documentation--current">
|
||||
<!--#include virtual="../../includes/_nav.htm" -->
|
||||
<div class="right">
|
||||
<!--#include virtual="../../includes/_docs_banner.htm" -->
|
||||
<ul class="breadcrumbs">
|
||||
<li><a href="/documentation">Documentation</a></li>
|
||||
<li><a href="/documentation/streams">Streams</a></li>
|
||||
</ul>
|
||||
<div class="p-content"></div>
|
||||
</div>
|
||||
</div>
|
||||
<!--#include virtual="../../includes/_footer.htm" -->
|
||||
<script>
|
||||
$(function() {
|
||||
// Show selected style on nav item
|
||||
$('.b-nav__streams').addClass('selected');
|
||||
|
||||
// Display docs subnav items
|
||||
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
|
||||
});
|
||||
</script>
|
|
@ -141,13 +141,14 @@
|
|||
<li><a href="#connect_development">8.3 Connector Development Guide</a></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a>
|
||||
<li><a href="/documentation/streams">9. Kafka Streams</a>
|
||||
<ul>
|
||||
<li><a href="/{{version}}/documentation/streams/quickstart">9.1 Play with a Streams Application</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/developer-guide">9.2 Developer Guide</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/core-concepts">9.3 Core Concepts</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/architecture">9.4 Architecture</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/upgrade-guide">9.5 Upgrade Guide and API Changes</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/tutorial">9.2 Write your own Streams Applications</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/developer-guide">9.3 Developer Manual</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/core-concepts">9.4 Core Concepts</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/architecture">9.5 Architecture</a></li>
|
||||
<li><a href="/{{version}}/documentation/streams/upgrade-guide">9.6 Upgrade Guide</a></li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.streams.examples.pipe;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
@ -51,13 +50,13 @@ public class PipeDemo {
|
|||
|
||||
StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
builder.stream("streams-file-input").to("streams-pipe-output");
|
||||
builder.stream("streams-plaintext-input").to("streams-pipe-output");
|
||||
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
|
@ -69,8 +68,8 @@ public class PipeDemo {
|
|||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
System.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.streams.examples.temperature;
|
|||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
@ -137,8 +136,8 @@ public class TemperatureDemo {
|
|||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
System.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
|
@ -37,7 +35,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
* Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
|
||||
* that computes a simple word occurrence histogram from an input text.
|
||||
*
|
||||
* In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
|
||||
* In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
|
||||
* represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
|
||||
* is an updated count of a single word.
|
||||
*
|
||||
|
@ -62,7 +60,7 @@ public class WordCountDemo {
|
|||
|
||||
StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
KStream<String, String> source = builder.stream("streams-wordcount-input");
|
||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||
|
||||
KTable<String, Long> counts = source
|
||||
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
|
@ -70,13 +68,13 @@ public class WordCountDemo {
|
|||
public Iterable<String> apply(String value) {
|
||||
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
|
||||
}
|
||||
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
|
||||
})
|
||||
.groupBy(new KeyValueMapper<String, String, String>() {
|
||||
@Override
|
||||
public KeyValue<String, String> apply(String key, String value) {
|
||||
return new KeyValue<>(value, value);
|
||||
public String apply(String key, String value) {
|
||||
return value;
|
||||
}
|
||||
})
|
||||
.groupByKey()
|
||||
.count("Counts");
|
||||
|
||||
// need to override value serde to Long type
|
||||
|
@ -98,8 +96,8 @@ public class WordCountDemo {
|
|||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
System.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.streams.examples.wordcount;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
@ -40,7 +39,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
* Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
|
||||
* that computes a simple word occurrence histogram from an input text.
|
||||
*
|
||||
* In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
|
||||
* In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
|
||||
* represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
|
||||
* is an updated count of a single word.
|
||||
*
|
||||
|
@ -121,7 +120,7 @@ public class WordCountProcessorDemo {
|
|||
|
||||
Topology builder = new Topology();
|
||||
|
||||
builder.addSource("Source", "streams-wordcount-input");
|
||||
builder.addSource("Source", "streams-plaintext-input");
|
||||
|
||||
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
|
||||
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
|
||||
|
@ -144,8 +143,8 @@ public class WordCountProcessorDemo {
|
|||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
Exit.exit(1);
|
||||
System.exit(1);
|
||||
}
|
||||
Exit.exit(0);
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>streams-quickstart</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>streams-quickstart-java</artifactId>
|
||||
<packaging>maven-archetype</packaging>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,34 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<archetype-descriptor
|
||||
xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
|
||||
name="streams-quickstart-java">
|
||||
<fileSets>
|
||||
<fileSet filtered="true" packaged="true" encoding="UTF-8">
|
||||
<directory>src/main/java</directory>
|
||||
<includes>
|
||||
<include>**/*.java</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
<fileSet encoding="UTF-8">
|
||||
<directory>src/main/resources</directory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
</archetype-descriptor>
|
|
@ -0,0 +1,136 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>${groupId}</groupId>
|
||||
<artifactId>${artifactId}</artifactId>
|
||||
<version>${version}</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>Kafka Streams Quickstart :: Java</name>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<kafka.version>1.0.0-SNAPSHOT</kafka.version>
|
||||
<slf4j.version>1.7.7</slf4j.version>
|
||||
<log4j.version>1.2.17</log4j.version>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>apache.snapshots</id>
|
||||
<name>Apache Development Snapshot Repository</name>
|
||||
<url>https://repository.apache.org/content/repositories/snapshots/</url>
|
||||
<releases>
|
||||
<enabled>false</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<!--
|
||||
Execute "mvn clean package -Pbuild-jar"
|
||||
to build a jar file out of this project!
|
||||
-->
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.1</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<compilerId>jdt</compilerId>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.tycho</groupId>
|
||||
<artifactId>tycho-compiler-jdt</artifactId>
|
||||
<version>0.21.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.eclipse.m2e</groupId>
|
||||
<artifactId>lifecycle-mapping</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<configuration>
|
||||
<lifecycleMappingMetadata>
|
||||
<pluginExecutions>
|
||||
<pluginExecution>
|
||||
<pluginExecutionFilter>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<versionRange>[2.4,)</versionRange>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</pluginExecutionFilter>
|
||||
<action>
|
||||
<ignore/>
|
||||
</action>
|
||||
</pluginExecution>
|
||||
<pluginExecution>
|
||||
<pluginExecutionFilter>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<versionRange>[3.1,)</versionRange>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</pluginExecutionFilter>
|
||||
<action>
|
||||
<ignore/>
|
||||
</action>
|
||||
</pluginExecution>
|
||||
</pluginExecutions>
|
||||
</lifecycleMappingMetadata>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<!-- Apache Kafka dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package ${package};
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.kstream.ValueMapper;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* In this example, we implement a simple LineSplit program using the high-level Streams DSL
|
||||
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text;
|
||||
* the code split each text line in string into words and then write back into a sink topic "streams-linesplit-output" where
|
||||
* each record represents a single word.
|
||||
*/
|
||||
public class LineSplit {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
builder.<String, String>stream("streams-plaintext-input")
|
||||
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
@Override
|
||||
public Iterable<String> apply(String value) {
|
||||
return Arrays.asList(value.split("\\W+"));
|
||||
}
|
||||
})
|
||||
.to("streams-linesplit-output");
|
||||
|
||||
/* ------- use the code below for Java 8 and uncomment the above ----
|
||||
|
||||
builder.stream("streams-plaintext-input")
|
||||
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
|
||||
.to("streams-linesplit-output");
|
||||
|
||||
----------------------------------------------------------------- */
|
||||
|
||||
|
||||
final Topology topology = builder.build();
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
System.exit(1);
|
||||
}
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package ${package};
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* In this example, we implement a simple LineSplit program using the high-level Streams DSL
|
||||
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
|
||||
* and writes the messages as-is into a sink topic "streams-pipe-output".
|
||||
*/
|
||||
public class Pipe {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
builder.stream("streams-plaintext-input").to("streams-pipe-output");
|
||||
|
||||
final Topology topology = builder.build();
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
System.exit(1);
|
||||
}
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package ${package};
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
||||
import org.apache.kafka.streams.kstream.ValueMapper;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* In this example, we implement a simple WordCount program using the high-level Streams DSL
|
||||
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
|
||||
* split each text line into words and then compute the word occurence histogram, write the continuous updated histogram
|
||||
* into a topic "streams-wordcount-output" where each record is an updated count of a single word.
|
||||
*/
|
||||
public class WordCount {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
builder.<String, String>stream("streams-plaintext-input")
|
||||
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||
@Override
|
||||
public Iterable<String> apply(String value) {
|
||||
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
|
||||
}
|
||||
})
|
||||
.groupBy(new KeyValueMapper<String, String, String>() {
|
||||
@Override
|
||||
public String apply(String key, String value) {
|
||||
return value;
|
||||
}
|
||||
})
|
||||
.count("Counts")
|
||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
||||
|
||||
|
||||
/* ------- use the code below for Java 8 and uncomment the above ----
|
||||
|
||||
builder.stream("streams-plaintext-input")
|
||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||
.groupBy((key, value) -> value)
|
||||
.count("Counts")
|
||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
||||
|
||||
----------------------------------------------------------------- */
|
||||
|
||||
final Topology topology = builder.build();
|
||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
|
||||
@Override
|
||||
public void run() {
|
||||
streams.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
streams.start();
|
||||
latch.await();
|
||||
} catch (Throwable e) {
|
||||
System.exit(1);
|
||||
}
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
log4j.rootLogger=INFO, console
|
||||
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
|
|
@ -0,0 +1,18 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
groupId=org.apache.kafka.archtypetest
|
||||
version=0.1
|
||||
artifactId=basic
|
||||
package=org.apache.kafka.archetypetest
|
|
@ -0,0 +1 @@
|
|||
compile
|
|
@ -0,0 +1,101 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>streams-quickstart</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<name>Kafka Streams :: Quickstart</name>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache</groupId>
|
||||
<artifactId>apache</artifactId>
|
||||
<version>18</version>
|
||||
</parent>
|
||||
|
||||
<modules>
|
||||
<module>java</module>
|
||||
</modules>
|
||||
<build>
|
||||
<extensions>
|
||||
<extension>
|
||||
<groupId>org.apache.maven.archetype</groupId>
|
||||
<artifactId>archetype-packaging</artifactId>
|
||||
<version>2.2</version>
|
||||
</extension>
|
||||
</extensions>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-archetype-plugin</artifactId>
|
||||
<version>2.2</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-archetype-plugin</artifactId>
|
||||
<version>2.2</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<!-- deactivate the shade plugin for the quickstart archetypes -->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase/>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>com.github.siom79.japicmp</groupId>
|
||||
<artifactId>japicmp-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<!-- use alternative delimiter for filtering resources -->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
<configuration>
|
||||
<useDefaultDelimiters>false</useDefaultDelimiters>
|
||||
<delimiters>
|
||||
<delimiter>@</delimiter>
|
||||
</delimiters>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
</project>
|
Loading…
Reference in New Issue