diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 344e47f9fd5..f45d8d4af17 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -52,6 +52,11 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/streams/examples/build/libs/kafka-streams-examples*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; do CLASSPATH=$CLASSPATH:$file diff --git a/build.gradle b/build.gradle index 5953f574d32..daf468fb52f 100644 --- a/build.gradle +++ b/build.gradle @@ -259,7 +259,7 @@ for ( sv in ['2_10', '2_11'] ) { } def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] -def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs +def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10', 'jar_core_2_11'] + pkgs.collect { it + ":jar" }) { } @@ -374,6 +374,7 @@ project(':core') { from(project(':connect:file').jar) { into("libs/") } from(project(':connect:file').configurations.runtime) { into("libs/") } from(project(':streams').jar) { into("libs/") } + from(project(':streams:examples').jar) { into("libs/") } } jar { diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b183b3d6081..051c8d13ef6 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -127,6 +127,11 @@ + + + + + diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java similarity index 100% rename from streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java similarity index 100% rename from streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java similarity index 100% rename from streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java similarity index 99% rename from streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java index f7266e3922a..6a105fdb9c6 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.examples.pageview; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java similarity index 92% rename from streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java index 3241b8f4dc0..e8905895771 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java @@ -20,7 +20,6 @@ package org.apache.kafka.streams.examples.pageview; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -93,15 +92,15 @@ public class PageViewUntypedJob { KStream regionCount = views .leftJoin(userRegions, new ValueJoiner() { - @Override - public JsonNode apply(JsonNode view, String region) { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + @Override + public JsonNode apply(JsonNode view, String region) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); - return jNode.put("user", view.get("user").textValue()) - .put("page", view.get("page").textValue()) - .put("region", region == null ? "UNKNOWN" : region); - } - }) + return jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region == null ? "UNKNOWN" : region); + } + }) .map(new KeyValueMapper>() { @Override public KeyValue apply(String user, JsonNode viewRegion) { diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java similarity index 98% rename from streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java index 79649d16f99..8885ca2aa9a 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.examples.pipe; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.KStreamBuilder; diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java similarity index 98% rename from streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java index 965eb797b68..82d216efb69 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.examples.wordcount; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java similarity index 97% rename from streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java rename to streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java index f5dd775f547..cb82656ccaa 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.examples.wordcount; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; @@ -64,7 +63,7 @@ public class WordCountProcessorJob { @Override public void process(String dummy, String line) { - String words[] = line.toLowerCase().split(" "); + String[] words = line.toLowerCase().split(" "); for (String word : words) { Integer oldValue = this.kvStore.get(word);