diff --git a/README.md b/README.md index 5b4e654400f..5ed30b2f4c0 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,36 @@ Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradl signing.password= signing.secretKeyRingFile= +### Publishing the streams quickstart archetype artifact to maven ### +For the Streams archetype project, one cannot use gradle to upload to maven; instead the `mvn deploy` command needs to be called at the quickstart folder: + + cd streams/quickstart + mvn deploy + +Please note for this to work you should create/update user maven settings (typically, `${USER_HOME}/.m2/settings.xml`) to assign the following variables + + + ... + + ... + + apache.snapshots.https + ${maven_username} + ${maven_password} + + + apache.releases.https + ${maven_username} + ${maven_password} + + ... + + ... + + ### Installing the jars to the local Maven repository ### ./gradlew installAll diff --git a/build.gradle b/build.gradle index 0e36a8547e2..48f3f2f9e24 100644 --- a/build.gradle +++ b/build.gradle @@ -121,7 +121,8 @@ if (new File('.git').exists()) { '**/README.md', '**/id_rsa', '**/id_rsa.pub', - 'checkstyle/suppressions.xml' + 'checkstyle/suppressions.xml', + 'streams/quickstart/java/src/test/resources/projects/basic/goal.txt' ]) } } diff --git a/docs/documentation/streams/tutorial.html b/docs/documentation/streams/tutorial.html new file mode 100644 index 00000000000..90f408df3f1 --- /dev/null +++ b/docs/documentation/streams/tutorial.html @@ -0,0 +1,19 @@ + + + + diff --git a/docs/js/templateData.js b/docs/js/templateData.js index 50997bd6b26..d727512cc5c 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -17,8 +17,8 @@ limitations under the License. // Define variables for doc templates var context={ - "version": "0110", - "dotVersion": "0.11.0", - "fullDotVersion": "0.11.0.0" + "version": "100", + "dotVersion": "1.0", + "fullDotVersion": "1.0.0" "scalaVersion:" "2.11" }; diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html index 031a37516ec..977fa5f4015 100644 --- a/docs/streams/quickstart.html +++ b/docs/streams/quickstart.html @@ -40,10 +40,10 @@ of the -Next, we create the input topic named streams-wordcount-input and the output topic named streams-wordcount-output: +Next, we create the input topic named streams-plaintext-input and the output topic named streams-wordcount-output:
 > bin/kafka-topics.sh --create \
     --zookeeper localhost:2181 \
     --replication-factor 1 \
     --partitions 1 \
-    --topic streams-wordcount-input
-Created topic "streams-wordcount-input".
+    --topic streams-plaintext-input
+Created topic "streams-plaintext-input".
 
 > bin/kafka-topics.sh --create \
     --zookeeper localhost:2181 \
@@ -143,8 +143,8 @@ The created topic can be described with the same kafka-topics tool:
 
 > bin/kafka-topics.sh --zookeeper localhost:2181 --describe
 
-Topic:streams-wordcount-input	PartitionCount:1	ReplicationFactor:1	Configs:
-    Topic: streams-wordcount-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
+Topic:streams-plaintext-input	PartitionCount:1	ReplicationFactor:1	Configs:
+    Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
 Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:
 	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
 
@@ -158,7 +158,7 @@ The following command starts the WordCount demo application:

-The demo application will read from the input topic streams-wordcount-input, perform the computations of the WordCount algorithm on each of the read messages, +The demo application will read from the input topic streams-plaintext-input, perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic streams-wordcount-output. Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.

@@ -166,7 +166,7 @@ Hence there won't be any STDOUT output except log entries as the results are wri Now we can start the console producer in a separate terminal to write some input data to this topic:
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
 
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal: @@ -185,12 +185,12 @@ and inspect the output of the WordCount demo application by reading from its out

Step 5: Process some data

-Now let's write some message with the console producer into the input topic streams-wordcount-input by entering a single line of text and then hit <RETURN>. +Now let's write some message with the console producer into the input topic streams-plaintext-input by entering a single line of text and then hit <RETURN>. This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
 
@@ -219,12 +219,12 @@ kafka 1 Here, the first column is the Kafka message key in java.lang.String format and represents a word that is being counted, and the second column is the message value in java.lang.Longformat, representing the word's latest count.

-Now let's continue writing one more message with the console producer into the input topic streams-wordcount-input. +Now let's continue writing one more message with the console producer into the input topic streams-plaintext-input. Enter the text line "hello kafka streams" and hit <RETURN>. Your terminal should look as follows:
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
 hello kafka streams
 
@@ -321,7 +321,7 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing h diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html new file mode 100644 index 00000000000..a1f4880bf28 --- /dev/null +++ b/docs/streams/tutorial.html @@ -0,0 +1,630 @@ + + + + + +
+ + + +
+ +
+ + +
+
+
+ + diff --git a/docs/toc.html b/docs/toc.html index 2ec012928d1..57047688470 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -141,13 +141,14 @@
  • 8.3 Connector Development Guide
  • -
  • 9. Kafka Streams +
  • 9. Kafka Streams
  • diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java index 0831e3bd970..538987747c9 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.examples.pipe; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -51,13 +50,13 @@ public class PipeDemo { StreamsBuilder builder = new StreamsBuilder(); - builder.stream("streams-file-input").to("streams-pipe-output"); + builder.stream("streams-plaintext-input").to("streams-pipe-output"); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c - Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { + Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") { @Override public void run() { streams.close(); @@ -69,8 +68,8 @@ public class PipeDemo { streams.start(); latch.await(); } catch (Throwable e) { - Exit.exit(1); + System.exit(1); } - Exit.exit(0); + System.exit(0); } } diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java index 764210bab01..1c2045e04fc 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.examples.temperature; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -137,8 +136,8 @@ public class TemperatureDemo { streams.start(); latch.await(); } catch (Throwable e) { - Exit.exit(1); + System.exit(1); } - Exit.exit(0); + System.exit(0); } } diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index e3cf60c998d..5689d508913 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -18,9 +18,7 @@ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @@ -37,7 +35,7 @@ import java.util.concurrent.CountDownLatch; * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program * that computes a simple word occurrence histogram from an input text. * - * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record * is an updated count of a single word. * @@ -62,7 +60,7 @@ public class WordCountDemo { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream("streams-wordcount-input"); + KStream source = builder.stream("streams-plaintext-input"); KTable counts = source .flatMapValues(new ValueMapper>() { @@ -70,13 +68,13 @@ public class WordCountDemo { public Iterable apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); } - }).map(new KeyValueMapper>() { + }) + .groupBy(new KeyValueMapper() { @Override - public KeyValue apply(String key, String value) { - return new KeyValue<>(value, value); + public String apply(String key, String value) { + return value; } }) - .groupByKey() .count("Counts"); // need to override value serde to Long type @@ -98,8 +96,8 @@ public class WordCountDemo { streams.start(); latch.await(); } catch (Throwable e) { - Exit.exit(1); + System.exit(1); } - Exit.exit(0); + System.exit(0); } } diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 34bb8bb5f39..b0b8be50986 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -40,7 +39,7 @@ import java.util.concurrent.CountDownLatch; * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program * that computes a simple word occurrence histogram from an input text. * - * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record * is an updated count of a single word. * @@ -121,7 +120,7 @@ public class WordCountProcessorDemo { Topology builder = new Topology(); - builder.addSource("Source", "streams-wordcount-input"); + builder.addSource("Source", "streams-plaintext-input"); builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); @@ -144,8 +143,8 @@ public class WordCountProcessorDemo { streams.start(); latch.await(); } catch (Throwable e) { - Exit.exit(1); + System.exit(1); } - Exit.exit(0); + System.exit(0); } } diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml new file mode 100644 index 00000000000..ff0a4173b74 --- /dev/null +++ b/streams/quickstart/java/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + UTF-8 + + + + org.apache.kafka + streams-quickstart + 1.0.0-SNAPSHOT + .. + + + streams-quickstart-java + maven-archetype + + \ No newline at end of file diff --git a/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml b/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml new file mode 100644 index 00000000000..9e0d8bd45b8 --- /dev/null +++ b/streams/quickstart/java/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -0,0 +1,34 @@ + + + + + + src/main/java + + **/*.java + + + + src/main/resources + + + diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml new file mode 100644 index 00000000000..8b79e066fb1 --- /dev/null +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -0,0 +1,136 @@ + + + + 4.0.0 + + ${groupId} + ${artifactId} + ${version} + jar + + Kafka Streams Quickstart :: Java + + + UTF-8 + 1.0.0-SNAPSHOT + 1.7.7 + 1.2.17 + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.8 + 1.8 + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + jdt + + + + org.eclipse.tycho + tycho-compiler-jdt + 0.21.0 + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-assembly-plugin + [2.4,) + + single + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + \ No newline at end of file diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java new file mode 100644 index 00000000000..ec40d2aad75 --- /dev/null +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ${package}; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.ValueMapper; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +/** + * In this example, we implement a simple LineSplit program using the high-level Streams DSL + * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text; + * the code split each text line in string into words and then write back into a sink topic "streams-linesplit-output" where + * each record represents a single word. + */ +public class LineSplit { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + final StreamsBuilder builder = new StreamsBuilder(); + + builder.stream("streams-plaintext-input") + .flatMapValues(new ValueMapper>() { + @Override + public Iterable apply(String value) { + return Arrays.asList(value.split("\\W+")); + } + }) + .to("streams-linesplit-output"); + + /* ------- use the code below for Java 8 and uncomment the above ---- + + builder.stream("streams-plaintext-input") + .flatMapValues(value -> Arrays.asList(value.split("\\W+"))) + .to("streams-linesplit-output"); + + ----------------------------------------------------------------- */ + + + final Topology topology = builder.build(); + final KafkaStreams streams = new KafkaStreams(topology, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + System.exit(1); + } + System.exit(0); + } +} diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java new file mode 100644 index 00000000000..b3152a78c8a --- /dev/null +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ${package}; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +/** + * In this example, we implement a simple LineSplit program using the high-level Streams DSL + * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text, + * and writes the messages as-is into a sink topic "streams-pipe-output". + */ +public class Pipe { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + final StreamsBuilder builder = new StreamsBuilder(); + + builder.stream("streams-plaintext-input").to("streams-pipe-output"); + + final Topology topology = builder.build(); + final KafkaStreams streams = new KafkaStreams(topology, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + System.exit(1); + } + System.exit(0); + } +} diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java new file mode 100644 index 00000000000..6dafa8cf832 --- /dev/null +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ${package}; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +/** + * In this example, we implement a simple WordCount program using the high-level Streams DSL + * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text, + * split each text line into words and then compute the word occurence histogram, write the continuous updated histogram + * into a topic "streams-wordcount-output" where each record is an updated count of a single word. + */ +public class WordCount { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + final StreamsBuilder builder = new StreamsBuilder(); + + builder.stream("streams-plaintext-input") + .flatMapValues(new ValueMapper>() { + @Override + public Iterable apply(String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }) + .groupBy(new KeyValueMapper() { + @Override + public String apply(String key, String value) { + return value; + } + }) + .count("Counts") + .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + + + /* ------- use the code below for Java 8 and uncomment the above ---- + + builder.stream("streams-plaintext-input") + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) + .groupBy((key, value) -> value) + .count("Counts") + .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + + ----------------------------------------------------------------- */ + + final Topology topology = builder.build(); + final KafkaStreams streams = new KafkaStreams(topology, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + System.exit(1); + } + System.exit(0); + } +} diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties new file mode 100644 index 00000000000..b620f1bb390 --- /dev/null +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file diff --git a/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties b/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties new file mode 100644 index 00000000000..c4a7c16adba --- /dev/null +++ b/streams/quickstart/java/src/test/resources/projects/basic/archetype.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +groupId=org.apache.kafka.archtypetest +version=0.1 +artifactId=basic +package=org.apache.kafka.archetypetest diff --git a/streams/quickstart/java/src/test/resources/projects/basic/goal.txt b/streams/quickstart/java/src/test/resources/projects/basic/goal.txt new file mode 100644 index 00000000000..f8808babbbb --- /dev/null +++ b/streams/quickstart/java/src/test/resources/projects/basic/goal.txt @@ -0,0 +1 @@ +compile \ No newline at end of file diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml new file mode 100644 index 00000000000..ef1ee0d1c76 --- /dev/null +++ b/streams/quickstart/pom.xml @@ -0,0 +1,101 @@ + + + + 4.0.0 + + org.apache.kafka + streams-quickstart + pom + 1.0.0-SNAPSHOT + + Kafka Streams :: Quickstart + + + org.apache + apache + 18 + + + + java + + + + + org.apache.maven.archetype + archetype-packaging + 2.2 + + + + + + org.apache.maven.plugins + maven-archetype-plugin + 2.2 + + + + + + maven-archetype-plugin + 2.2 + + true + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + + + + + + com.github.siom79.japicmp + japicmp-maven-plugin + + true + + + + + + org.apache.maven.plugins + maven-resources-plugin + + false + + @ + + + + + + + src/main/resources + true + + + + \ No newline at end of file