diff --git a/build.gradle b/build.gradle index 150cac7fec8..0b1dcc42554 100644 --- a/build.gradle +++ b/build.gradle @@ -514,6 +514,7 @@ project(':streams') { dependencies { compile project(':clients') + compile project(':connect:json') // this dependency should be removed after we unify data API compile libs.slf4jlog4j compile libs.rocksDBJni compile libs.zkclient // this dependency should be removed after KIP-4 @@ -542,6 +543,30 @@ project(':streams') { } } +project(':streams:examples') { + archivesBaseName = "kafka-streams-examples" + + dependencies { + compile project(':streams') + compile project(':connect:json') // this dependency should be removed after we unify data API + } + + javadoc { + enabled = false + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.runtime) { + exclude('kafka-streams*') + } + into "$buildDir/dependant-libs-${versions.scala}" + } + + jar { + dependsOn 'copyDependantLibs' + } +} + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" diff --git a/settings.gradle b/settings.gradle index 097c43bbd1d..d430c2fd919 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,5 +13,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', +include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender', 'connect:api', 'connect:runtime', 'connect:json', 'connect:file' diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java new file mode 100644 index 00000000000..583ec2d8690 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +/** + * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily + * structured data without having associated Java classes. This deserializer also supports Connect schemas. + */ +public class JsonPOJODeserializer implements Deserializer { + private ObjectMapper objectMapper = new ObjectMapper(); + + private Class tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJODeserializer() { + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map props, boolean isKey) { + tClass = (Class) props.get("JsonPOJOClass"); + } + + @Override + public T deserialize(String topic, byte[] bytes) { + if (bytes == null) + return null; + + T data; + try { + data = objectMapper.readValue(bytes, tClass); + } catch (Exception e) { + throw new SerializationException(e); + } + + return data; + } + + @Override + public void close() { + + } +} diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java new file mode 100644 index 00000000000..bb60327066b --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.streams.examples.pageview; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class JsonPOJOSerializer implements Serializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + private Class tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJOSerializer() { + + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map props, boolean isKey) { + tClass = (Class) props.get("JsonPOJOClass"); + } + + @Override + public byte[] serialize(String topic, T data) { + if (data == null) + return null; + + try { + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + } + +} diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java new file mode 100644 index 00000000000..c06484814cc --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Properties; + +public class PageViewTypedJob { + + // POJO classes + static public class PageView { + public String user; + public String page; + } + + static public class UserProfile { + public String user; + public String region; + } + + static public class PageViewByRegion { + public String user; + public String page; + public String region; + } + + static public class WindowedPageViewByRegion { + public long windowStart; + public String region; + } + + static public class RegionCount { + public long count; + public String region; + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer stringSerializer = new StringSerializer(); + final Deserializer stringDeserializer = new StringDeserializer(); + final Serializer longSerializer = new LongSerializer(); + final Deserializer longDeserializer = new LongDeserializer(); + + + KStream views = builder.stream("streams-pageview-input"); + + KStream viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record)); + + KTable users = builder.table("streams-userprofile-input"); + + KStream regionCount = viewsByUser + .leftJoin(users, (view, profile) -> { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + viewByRegion.region = profile.region; + + return viewByRegion; + }) + .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) + .aggregateByKey(new Count(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + .toStream() + .map(new KeyValueMapper, Long, KeyValue>() { + @Override + public KeyValue apply(Windowed key, Long value) { + WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); + wViewByRegion.windowStart = key.window().start(); + wViewByRegion.region = key.value(); + + RegionCount rCount = new RegionCount(); + rCount.region = key.value(); + rCount.count = value; + + return new KeyValue<>(wViewByRegion, rCount); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>()); + + KafkaStreams kstream = new KafkaStreams(builder, props); + kstream.start(); + } +} diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java new file mode 100644 index 00000000000..1ae02c977f1 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Properties; + +public class PageViewUnTypedJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + + StreamsConfig config = new StreamsConfig(props); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer stringSerializer = new StringSerializer(); + final Deserializer stringDeserializer = new StringDeserializer(); + final Serializer longSerializer = new LongSerializer(); + final Deserializer longDeserializer = new LongDeserializer(); + + + KStream views = builder.stream("streams-pageview-input"); + + KStream viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record)); + + KTable users = builder.table("streams-userprofile-input"); + + KTable userRegions = users.mapValues(record -> record.get("region").textValue()); + + KStream regionCount = viewsByUser + .leftJoin(userRegions, (view, region) -> { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + return (JsonNode) jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region); + }) + .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) + .aggregateByKey(new Count(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + .toStream() + .map(new KeyValueMapper, Long, KeyValue>() { + @Override + public KeyValue apply(Windowed key, Long value) { + ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("window-start", key.window().start()) + .put("region", key.window().start()); + + ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("count", value); + + return new KeyValue((JsonNode) keyNode, (JsonNode) valueNode); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-output"); + + KafkaStreams kstream = new KafkaStreams(builder, config); + kstream.start(); + } +} diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java new file mode 100644 index 00000000000..4a4f97fb020 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pipe; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.Properties; + +public class PipeJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // can specify underlying client configs if necessary + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + builder.stream("streams-file-input").to("streams-pipe-output"); + + KafkaStreams kstream = new KafkaStreams(builder, props); + kstream.start(); + } +} diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java new file mode 100644 index 00000000000..8aa15a4529a --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Arrays; +import java.util.Properties; + +public class WordCountJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // can specify underlying client configs if necessary + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer stringSerializer = new StringSerializer(); + final Deserializer stringDeserializer = new StringDeserializer(); + final Serializer longSerializer = new LongSerializer(); + final Deserializer longDeserializer = new LongDeserializer(); + final Serializer JsonSerializer = new JsonSerializer(); + + KStream source = builder.stream("streams-file-input"); + + KStream counts = source + .flatMapValues(new ValueMapper>() { + @Override + public Iterable apply(String value) { + return Arrays.asList(value.toLowerCase().split(" ")); + } + }).map(new KeyValueMapper>() { + @Override + public KeyValue apply(String key, String value) { + return new KeyValue(value, value); + } + }) + .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + .toStream() + .map(new KeyValueMapper, Long, KeyValue>() { + @Override + public KeyValue apply(Windowed key, Long value) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + jNode.put("word", key.value()) + .put("count", value); + + return new KeyValue(null, jNode); + } + }); + + counts.to("streams-wordcount-output", stringSerializer, JsonSerializer); + + KafkaStreams kstream = new KafkaStreams(builder, props); + kstream.start(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java similarity index 68% rename from streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java rename to streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java index e17c16b6c5d..63692bd927d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.kafka.streams.examples; +package org.apache.kafka.streams.examples.wordcount; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; @@ -34,7 +33,7 @@ import org.apache.kafka.streams.state.Stores; import java.util.Properties; -public class ProcessorJob { +public class WordCountProcessorJob { private static class MyProcessorSupplier implements ProcessorSupplier { @@ -49,17 +48,21 @@ public class ProcessorJob { public void init(ProcessorContext context) { this.context = context; this.context.schedule(1000); - this.kvStore = (KeyValueStore) context.getStateStore("LOCAL-STATE"); + this.kvStore = (KeyValueStore) context.getStateStore("Counts"); } @Override - public void process(String key, String value) { - Integer oldValue = this.kvStore.get(key); - Integer newValue = Integer.parseInt(value); - if (oldValue == null) { - this.kvStore.put(key, newValue); - } else { - this.kvStore.put(key, oldValue + newValue); + public void process(String dummy, String line) { + String words[] = line.toLowerCase().split(" "); + + for (String word : words) { + Integer oldValue = this.kvStore.get(word); + + if (oldValue == null) { + this.kvStore.put(word, 1); + } else { + this.kvStore.put(word, oldValue + 1); + } } context.commit(); @@ -69,12 +72,14 @@ public class ProcessorJob { public void punctuate(long timestamp) { KeyValueIterator iter = this.kvStore.all(); + System.out.println("----------- " + timestamp + "----------- "); + while (iter.hasNext()) { KeyValue entry = iter.next(); System.out.println("[" + entry.key + ", " + entry.value + "]"); - context.forward(entry.key, entry.value); + context.forward(entry.key, entry.value.toString()); } iter.close(); @@ -90,26 +95,27 @@ public class ProcessorJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor"); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamsConfig config = new StreamsConfig(props); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // can specify underlying client configs if necessary + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); + builder.addSource("Source", "streams-file-input"); - builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE"); - builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS"); + builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); + builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); - builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); + builder.addSink("Sink", "streams-wordcount-output", "Process"); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 3843b1d6d65..16bb06a3070 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -113,7 +113,7 @@ public class StreamsConfig extends AbstractConfig { /** client.id */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; - private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir"); + private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor"; static { CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value @@ -136,8 +136,8 @@ public class StreamsConfig extends AbstractConfig { StreamsConfig.ZOOKEEPER_CONNECT_DOC) .define(STATE_DIR_CONFIG, Type.STRING, - SYSTEM_TEMP_DIRECTORY, - Importance.HIGH, + "/tmp/kafka-streams", + Importance.MEDIUM, STATE_DIR_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value Type.CLASS, diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java deleted file mode 100644 index a2343959829..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples; - -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; - -import java.util.Properties; - -public class KStreamJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamsConfig config = new StreamsConfig(props); - - KStreamBuilder builder = new KStreamBuilder(); - - KStream stream1 = builder.stream("topic1"); - - KStream stream2 = - stream1.map(new KeyValueMapper>() { - @Override - public KeyValue apply(String key, String value) { - return new KeyValue<>(key, new Integer(value)); - } - }).filter(new Predicate() { - @Override - public boolean test(String key, Integer value) { - return true; - } - }); - - KStream[] streams = stream2.branch( - new Predicate() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }, - new Predicate() { - @Override - public boolean test(String key, Integer value) { - return true; - } - } - ); - - streams[0].to("topic2"); - streams[1].to("topic3"); - - KafkaStreams kstream = new KafkaStreams(builder, config); - kstream.start(); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java index 3c1ed46e56e..8780cc721dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream; -public class Count implements Aggregator { +public class Count implements Aggregator { @Override public Long initialValue(K aggKey) { @@ -25,12 +25,12 @@ public class Count implements Aggregator { } @Override - public Long add(K aggKey, Long value, Long aggregate) { + public Long add(K aggKey, V value, Long aggregate) { return aggregate + 1L; } @Override - public Long remove(K aggKey, Long value, Long aggregate) { + public Long remove(K aggKey, V value, Long aggregate) { return aggregate - 1L; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java deleted file mode 100644 index ae3b85817f9..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface KeyValueToDoubleMapper { - - double apply(K key, V value); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java deleted file mode 100644 index 72e5ee935a1..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface KeyValueToIntMapper { - - int apply(K key, V value); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java deleted file mode 100644 index 3a8d8a89d18..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface KeyValueToLongMapper { - - long apply(K key, V value); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java index 02ece3aae06..188fe662e58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.TumblingWindow; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class TumblingWindows extends Windows { @@ -53,7 +53,11 @@ public class TumblingWindows extends Windows { public Map windowsFor(long timestamp) { long windowStart = timestamp - timestamp % size; - return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size)); + // we cannot use Collections.singleMap since it does not support remove() call + Map windows = new HashMap<>(); + windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size)); + + return windows; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 6f472534c9a..06882b3709c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class UnlimitedWindows extends Windows { @@ -48,7 +48,13 @@ public class UnlimitedWindows extends Windows { @Override public Map windowsFor(long timestamp) { // always return the single unlimited window - return Collections.singletonMap(start, new UnlimitedWindow(start)); + + // we cannot use Collections.singleMap since it does not support remove() call + Map windows = new HashMap<>(); + windows.put(start, new UnlimitedWindow(start)); + + + return windows; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 98e50c3cb7a..7ebc28ca48a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.Serdes; @@ -217,14 +217,14 @@ public class KStreamImpl extends AbstractStream implements KStream keySerializer, Serializer valSerializer) { String name = topology.newName(SINK_NAME); - StreamsPartitioner streamsPartitioner = null; + StreamPartitioner streamPartitioner = null; if (keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; - streamsPartitioner = (StreamsPartitioner) new WindowedStreamsPartitioner(windowedSerializer); + streamPartitioner = (StreamPartitioner) new WindowedStreamPartitioner(windowedSerializer); } - topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name); + topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java similarity index 84% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index ff1fa2c645d..10e69cc776f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -18,18 +18,18 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; -public class WindowedStreamsPartitioner implements StreamsPartitioner, V> { +public class WindowedStreamPartitioner implements StreamPartitioner, V> { private final WindowedSerializer serializer; - public WindowedStreamsPartitioner(WindowedSerializer serializer) { + public WindowedStreamPartitioner(WindowedSerializer serializer) { this.serializer = serializer; } /** - * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value + * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value * and the current number of partitions. The partition number id determined by the original key of the windowed key * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java similarity index 86% rename from streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java rename to streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java index f8d199d91f1..f14d9d943f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java @@ -33,19 +33,19 @@ package org.apache.kafka.streams.processor; * An upstream topology producing messages to that topic can use a custom stream partitioner to precisely and consistently * determine to which partition each message should be written. *

- * To do this, create a StreamsPartitioner implementation, and when you build your topology specify that custom partitioner - * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink} + * To do this, create a StreamPartitioner implementation, and when you build your topology specify that custom partitioner + * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink} * for that topic. *

- * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes. + * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes. * * @param the type of keys * @param the type of values * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, - * org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) - * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...) + * org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) + * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...) */ -public interface StreamsPartitioner { +public interface StreamPartitioner { /** * Determine the partition number for a message with the given key and value and the current number of partitions. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index f4e68213765..a6b54b7998b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -135,9 +135,9 @@ public class TopologyBuilder { public final String topic; private Serializer keySerializer; private Serializer valSerializer; - private final StreamsPartitioner partitioner; + private final StreamPartitioner partitioner; - private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) { + private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) { super(name); this.parents = parents.clone(); this.topic = topic; @@ -245,9 +245,9 @@ public class TopologyBuilder { * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null - * @see #addSink(String, String, StreamsPartitioner, String...) + * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public final TopologyBuilder addSink(String name, String topic, String... parentNames) { return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames); @@ -260,7 +260,7 @@ public class TopologyBuilder { * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. *

- * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among + * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among * the named Kafka topic's partitions. Such control is often useful with topologies that use * {@link #addStateStore(StateStoreSupplier, String...) state stores} * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute @@ -274,9 +274,9 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) { + public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) { return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames); } @@ -284,7 +284,7 @@ public class TopologyBuilder { * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the specified key and value serializers. *

- * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among + * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among * the named Kafka topic's partitions. Such control is often useful with topologies that use * {@link #addStateStore(StateStoreSupplier, String...) state stores} * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute @@ -302,11 +302,11 @@ public class TopologyBuilder { * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamsPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...) + * @see #addSink(String, String, StreamPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) { - return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames); + return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames); } /** @@ -326,10 +326,10 @@ public class TopologyBuilder { * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamsPartitioner, String...) + * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) */ - public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner, String... parentNames) { + public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner, String... parentNames) { if (nodeFactories.containsKey(name)) throw new TopologyException("Processor " + name + " is already added."); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index ef4c3c74355..68680ab5d17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -57,7 +57,8 @@ public abstract class AbstractTask { // create the processor state manager try { - File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString()); + File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString()); // if partitions is null, this is a standby task this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby); } catch (IOException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 25c663d1f19..fe0472eb9dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +72,7 @@ public class RecordCollector { } public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, - StreamsPartitioner partitioner) { + StreamPartitioner partitioner) { byte[] keyBytes = keySerializer.serialize(record.topic(), record.key()); byte[] valBytes = valueSerializer.serialize(record.topic(), record.value()); Integer partition = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 88b3f56d4f0..7ab59ee6ca3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; public class SinkNode extends ProcessorNode { private final String topic; private Serializer keySerializer; private Serializer valSerializer; - private final StreamsPartitioner partitioner; + private final StreamPartitioner partitioner; private ProcessorContext context; - public SinkNode(String name, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) { + public SinkNode(String name, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) { super(name); this.topic = topic; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e5d09221bcf..f118f60bfaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -104,6 +104,18 @@ public class StreamThread extends Thread { private final Map>> standbyRecords; private boolean processStandbyRecords = false; + static File makeStateDir(String jobId, String baseDirName) { + File baseDir = new File(baseDirName); + if (!baseDir.exists()) + baseDir.mkdir(); + + File stateDir = new File(baseDir, jobId); + if (!stateDir.exists()) + stateDir.mkdir(); + + return stateDir; + } + final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection assignment) { @@ -167,8 +179,7 @@ public class StreamThread extends Thread { this.standbyRecords = new HashMap<>(); // read in task specific config values - this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); - this.stateDir.mkdir(); + this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); @@ -452,14 +463,15 @@ public class StreamThread extends Thread { if (stateDirs != null) { for (File dir : stateDirs) { try { - TaskId id = TaskId.parse(dir.getName()); + String dirName = dir.getName(); + TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1)); // try to acquire the exclusive lock on the state directory FileLock directoryLock = null; try { directoryLock = ProcessorStateManager.lockStateDirectory(dir); if (directoryLock != null) { - log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs); + log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs); Utils.delete(dir); } } catch (IOException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java similarity index 95% rename from streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java index 26281d69d02..60b3b96dd89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.streams.examples; +package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.processor.TimestampExtractor; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 777fae5e6d7..b2af9048af5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.examples.WallclockTimestampExtractor; +import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamThread; import org.junit.Before; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java similarity index 95% rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 18494fd933b..1b8cbb8a449 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -33,7 +33,7 @@ import java.util.Random; import static org.junit.Assert.assertEquals; -public class WindowedStreamsPartitionerTest { +public class WindowedStreamPartitionerTest { private String topicName = "topic"; @@ -59,7 +59,7 @@ public class WindowedStreamsPartitionerTest { DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); WindowedSerializer windowedSerializer = new WindowedSerializer<>(keySerializer); - WindowedStreamsPartitioner streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer); + WindowedStreamPartitioner streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 60bd3090571..cb6ea056beb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueIterator; @@ -194,8 +194,8 @@ public class ProcessorTopologyTest { assertNull(driver.readOutput(topic)); } - protected StreamsPartitioner constantPartitioner(final Integer partition) { - return new StreamsPartitioner() { + protected StreamPartitioner constantPartitioner(final Integer partition) { + return new StreamPartitioner() { @Override public Integer partition(K key, V value, int numPartitions) { return partition; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index fd604b675f7..ffcf9ae15de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -93,8 +93,8 @@ public class StandbyTaskTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.JOB_ID_CONFIG, jobId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); } @@ -200,7 +200,7 @@ public class StandbyTaskTest { task.close(); - File taskDir = new File(baseDir, taskId.toString()); + File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString()); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); Map offsets = checkpoint.read(); @@ -298,7 +298,7 @@ public class StandbyTaskTest { task.close(); - File taskDir = new File(baseDir, taskId.toString()); + File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString()); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); Map offsets = checkpoint.read(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2d531bcc770..039cb968191 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -59,8 +59,9 @@ import java.util.UUID; public class StreamThreadTest { - private String clientId = "clientId"; - private UUID processId = UUID.randomUUID(); + private final String clientId = "clientId"; + private final String jobId = "stream-thread-test"; + private final UUID processId = UUID.randomUUID(); private TopicPartition t1p1 = new TopicPartition("topic1", 1); private TopicPartition t1p2 = new TopicPartition("topic1", 2); @@ -117,8 +118,8 @@ public class StreamThreadTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.JOB_ID_CONFIG, jobId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); } }; @@ -128,13 +129,14 @@ public class StreamThreadTest { public boolean committed = false; public TestStreamTask(TaskId id, + String jobId, Collection partitions, ProcessorTopology topology, Consumer consumer, Producer producer, Consumer restoreConsumer, StreamsConfig config) { - super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null); + super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null); } @Override @@ -161,11 +163,11 @@ public class StreamThreadTest { builder.addSource("source3", "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -264,10 +266,12 @@ public class StreamThreadTest { StreamsConfig config = new StreamsConfig(props); - File stateDir1 = new File(baseDir, task1.toString()); - File stateDir2 = new File(baseDir, task2.toString()); - File stateDir3 = new File(baseDir, task3.toString()); - File extraDir = new File(baseDir, "X"); + File jobDir = new File(baseDir, jobId); + jobDir.mkdir(); + File stateDir1 = new File(jobDir, task1.toString()); + File stateDir2 = new File(jobDir, task2.toString()); + File stateDir3 = new File(jobDir, task3.toString()); + File extraDir = new File(jobDir, "X"); stateDir1.mkdir(); stateDir2.mkdir(); stateDir3.mkdir(); @@ -281,7 +285,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) { @Override public void maybeClean() { super.maybeClean(); @@ -290,7 +294,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -403,7 +407,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) { @Override public void maybeCommit() { super.maybeCommit(); @@ -412,7 +416,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 36e487b617e..1e9c3bae839 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.test.MockProcessorContext; @@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver { } @Override public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, - StreamsPartitioner partitioner) { + StreamPartitioner partitioner) { recordFlushed(record.key(), record.value()); } }; diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 8f8e00f7d4a..2dc567ea349 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -130,7 +130,7 @@ public class KStreamTestDriver { @Override public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, - StreamsPartitioner partitioner) { + StreamPartitioner partitioner) { // The serialization is skipped. process(record.topic(), record.key(), record.value()); }