mirror of https://github.com/apache/kafka.git
MINOR: Move streams-examples source files under src folder
Also remove some unused imports. Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #992 from guozhangwang/KSExamples
This commit is contained in:
parent
79662cc7cb
commit
edeb11bc56
|
@ -52,6 +52,11 @@ do
|
|||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $base_dir/streams/examples/build/libs/kafka-streams-examples*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
|
|
|
@ -259,7 +259,7 @@ for ( sv in ['2_10', '2_11'] ) {
|
|||
}
|
||||
|
||||
def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
|
||||
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
|
||||
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
|
||||
|
||||
tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
|
||||
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10', 'jar_core_2_11'] + pkgs.collect { it + ":jar" }) { }
|
||||
|
@ -374,6 +374,7 @@ project(':core') {
|
|||
from(project(':connect:file').jar) { into("libs/") }
|
||||
from(project(':connect:file').configurations.runtime) { into("libs/") }
|
||||
from(project(':streams').jar) { into("libs/") }
|
||||
from(project(':streams:examples').jar) { into("libs/") }
|
||||
}
|
||||
|
||||
jar {
|
||||
|
|
|
@ -127,6 +127,11 @@
|
|||
|
||||
<allow pkg="org.apache.kafka.streams"/>
|
||||
|
||||
<subpackage name="examples">
|
||||
<allow pkg="com.fasterxml.jackson.databind" />
|
||||
<allow pkg="org.apache.kafka.connect.json" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="state">
|
||||
<allow pkg="org.rocksdb" />
|
||||
</subpackage>
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.streams.examples.pageview;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.streams.examples.pageview;
|
|||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
||||
|
@ -93,15 +92,15 @@ public class PageViewUntypedJob {
|
|||
|
||||
KStream<JsonNode, JsonNode> regionCount = views
|
||||
.leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
|
||||
@Override
|
||||
public JsonNode apply(JsonNode view, String region) {
|
||||
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
|
||||
@Override
|
||||
public JsonNode apply(JsonNode view, String region) {
|
||||
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
|
||||
|
||||
return jNode.put("user", view.get("user").textValue())
|
||||
.put("page", view.get("page").textValue())
|
||||
.put("region", region == null ? "UNKNOWN" : region);
|
||||
}
|
||||
})
|
||||
return jNode.put("user", view.get("user").textValue())
|
||||
.put("page", view.get("page").textValue())
|
||||
.put("region", region == null ? "UNKNOWN" : region);
|
||||
}
|
||||
})
|
||||
.map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
|
||||
@Override
|
||||
public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.streams.examples.pipe;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.streams.examples.wordcount;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.streams.examples.wordcount;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
|
@ -64,7 +63,7 @@ public class WordCountProcessorJob {
|
|||
|
||||
@Override
|
||||
public void process(String dummy, String line) {
|
||||
String words[] = line.toLowerCase().split(" ");
|
||||
String[] words = line.toLowerCase().split(" ");
|
||||
|
||||
for (String word : words) {
|
||||
Integer oldValue = this.kvStore.get(word);
|
Loading…
Reference in New Issue