KAFKA-3066: Demo Examples for Kafka Streams

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #797 from guozhangwang/K3066
This commit is contained in:
Guozhang Wang 2016-01-22 15:25:24 -08:00 committed by Ewen Cheslack-Postava
parent a19729fe61
commit c197113a9c
33 changed files with 670 additions and 252 deletions

View File

@ -514,6 +514,7 @@ project(':streams') {
dependencies {
compile project(':clients')
compile project(':connect:json') // this dependency should be removed after we unify data API
compile libs.slf4jlog4j
compile libs.rocksDBJni
compile libs.zkclient // this dependency should be removed after KIP-4
@ -542,6 +543,30 @@ project(':streams') {
}
}
project(':streams:examples') {
archivesBaseName = "kafka-streams-examples"
dependencies {
compile project(':streams')
compile project(':connect:json') // this dependency should be removed after we unify data API
}
javadoc {
enabled = false
}
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtime) {
exclude('kafka-streams*')
}
into "$buildDir/dependant-libs-${versions.scala}"
}
jar {
dependsOn 'copyDependantLibs'
}
}
project(':log4j-appender') {
archivesBaseName = "kafka-log4j-appender"

View File

@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
'connect:api', 'connect:runtime', 'connect:json', 'connect:file'

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.streams.examples.pageview;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
*/
public class JsonPOJODeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJODeserializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.streams.examples.pageview;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonPOJOSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJOSerializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.pageview;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.Properties;
public class PageViewTypedJob {
// POJO classes
static public class PageView {
public String user;
public String page;
}
static public class UserProfile {
public String user;
public String region;
}
static public class PageViewByRegion {
public String user;
public String page;
public String region;
}
static public class WindowedPageViewByRegion {
public long windowStart;
public String region;
}
static public class RegionCount {
public long count;
public String region;
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
KStreamBuilder builder = new KStreamBuilder();
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();
KStream<String, PageView> views = builder.stream("streams-pageview-input");
KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record));
KTable<String, UserProfile> users = builder.table("streams-userprofile-input");
KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser
.leftJoin(users, (view, profile) -> {
PageViewByRegion viewByRegion = new PageViewByRegion();
viewByRegion.user = view.user;
viewByRegion.page = view.page;
viewByRegion.region = profile.region;
return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
@Override
public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.value();
RegionCount rCount = new RegionCount();
rCount.region = key.value();
rCount.count = value;
return new KeyValue<>(wViewByRegion, rCount);
}
});
// write to the result topic
regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.pageview;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.Properties;
public class PageViewUnTypedJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();
KStream<String, JsonNode> views = builder.stream("streams-pageview-input");
KStream<String, JsonNode> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record));
KTable<String, JsonNode> users = builder.table("streams-userprofile-input");
KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
KStream<JsonNode, JsonNode> regionCount = viewsByUser
.leftJoin(userRegions, (view, region) -> {
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
return (JsonNode) jNode.put("user", view.get("user").textValue())
.put("page", view.get("page").textValue())
.put("region", region);
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
@Override
public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
.put("region", key.window().start());
ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
keyNode.put("count", value);
return new KeyValue<JsonNode, JsonNode>((JsonNode) keyNode, (JsonNode) valueNode);
}
});
// write to the result topic
regionCount.to("streams-pageviewstats-output");
KafkaStreams kstream = new KafkaStreams(builder, config);
kstream.start();
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.pipe;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class PipeJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// can specify underlying client configs if necessary
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
builder.stream("streams-file-input").to("streams-pipe-output");
KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.wordcount;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.Arrays;
import java.util.Properties;
public class WordCountJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// can specify underlying client configs if necessary
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();
final Serializer<JsonNode> JsonSerializer = new JsonSerializer();
KStream<String, String> source = builder.stream("streams-file-input");
KStream<String, JsonNode> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase().split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(value, value);
}
})
.aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<String, JsonNode>>() {
@Override
public KeyValue<String, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
jNode.put("word", key.value())
.put("count", value);
return new KeyValue<String, JsonNode>(null, jNode);
}
});
counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
}
}

View File

@ -15,10 +15,9 @@
* limitations under the License.
*/
package org.apache.kafka.streams.examples;
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
@ -34,7 +33,7 @@ import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
public class ProcessorJob {
public class WordCountProcessorJob {
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
@ -49,17 +48,21 @@ public class ProcessorJob {
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE");
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
}
@Override
public void process(String key, String value) {
Integer oldValue = this.kvStore.get(key);
Integer newValue = Integer.parseInt(value);
if (oldValue == null) {
this.kvStore.put(key, newValue);
} else {
this.kvStore.put(key, oldValue + newValue);
public void process(String dummy, String line) {
String words[] = line.toLowerCase().split(" ");
for (String word : words) {
Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
context.commit();
@ -69,12 +72,14 @@ public class ProcessorJob {
public void punctuate(long timestamp) {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
System.out.println("----------- " + timestamp + "----------- ");
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(entry.key, entry.value);
context.forward(entry.key, entry.value.toString());
}
iter.close();
@ -90,26 +95,27 @@ public class ProcessorJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
StreamsConfig config = new StreamsConfig(props);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// can specify underlying client configs if necessary
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
builder.addSource("Source", "streams-file-input");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS");
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
builder.addSink("Sink", "streams-wordcount-output", "Process");
KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

View File

@ -113,7 +113,7 @@ public class StreamsConfig extends AbstractConfig {
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
static {
CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value
@ -136,8 +136,8 @@ public class StreamsConfig extends AbstractConfig {
StreamsConfig.ZOOKEEPER_CONNECT_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
SYSTEM_TEMP_DIRECTORY,
Importance.HIGH,
"/tmp/kafka-streams",
Importance.MEDIUM,
STATE_DIR_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value
Type.CLASS,

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import java.util.Properties;
public class KStreamJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, Integer> stream2 =
stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(String key, String value) {
return new KeyValue<>(key, new Integer(value));
}
}).filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return true;
}
});
KStream<String, Integer>[] streams = stream2.branch(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
},
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return true;
}
}
);
streams[0].to("topic2");
streams[1].to("topic3");
KafkaStreams kstream = new KafkaStreams(builder, config);
kstream.start();
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream;
public class Count<K> implements Aggregator<K, Long, Long> {
public class Count<K, V> implements Aggregator<K, V, Long> {
@Override
public Long initialValue(K aggKey) {
@ -25,12 +25,12 @@ public class Count<K> implements Aggregator<K, Long, Long> {
}
@Override
public Long add(K aggKey, Long value, Long aggregate) {
public Long add(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
@Override
public Long remove(K aggKey, Long value, Long aggregate) {
public Long remove(K aggKey, V value, Long aggregate) {
return aggregate - 1L;
}
}

View File

@ -1,23 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface KeyValueToDoubleMapper<K, V> {
double apply(K key, V value);
}

View File

@ -1,23 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface KeyValueToIntMapper<K, V> {
int apply(K key, V value);
}

View File

@ -1,23 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface KeyValueToLongMapper<K, V> {
long apply(K key, V value);
}

View File

@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TumblingWindow;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class TumblingWindows extends Windows<TumblingWindow> {
@ -53,7 +53,11 @@ public class TumblingWindows extends Windows<TumblingWindow> {
public Map<Long, TumblingWindow> windowsFor(long timestamp) {
long windowStart = timestamp - timestamp % size;
return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size));
// we cannot use Collections.singleMap since it does not support remove() call
Map<Long, TumblingWindow> windows = new HashMap<>();
windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size));
return windows;
}
@Override

View File

@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class UnlimitedWindows extends Windows<UnlimitedWindow> {
@ -48,7 +48,13 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
@Override
public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
// always return the single unlimited window
return Collections.singletonMap(start, new UnlimitedWindow(start));
// we cannot use Collections.singleMap since it does not support remove() call
Map<Long, UnlimitedWindow> windows = new HashMap<>();
windows.put(start, new UnlimitedWindow(start));
return windows;
}
@Override

View File

@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
String name = topology.newName(SINK_NAME);
StreamsPartitioner<K, V> streamsPartitioner = null;
StreamPartitioner<K, V> streamPartitioner = null;
if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
}
topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
}
@Override

View File

@ -18,18 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
private final WindowedSerializer<K> serializer;
public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
this.serializer = serializer;
}
/**
* WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
* WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
* and the current number of partitions. The partition number id determined by the original key of the windowed key
* using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
*

View File

@ -33,19 +33,19 @@ package org.apache.kafka.streams.processor;
* An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
* determine to which partition each message should be written.
* <p>
* To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
* when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
* To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
* when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
* for that topic.
* <p>
* All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
* All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
*
* @param <K> the type of keys
* @param <V> the type of values
* @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
* org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
* @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
* org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
* @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
*/
public interface StreamsPartitioner<K, V> {
public interface StreamPartitioner<K, V> {
/**
* Determine the partition number for a message with the given key and value and the current number of partitions.

View File

@ -135,9 +135,9 @@ public class TopologyBuilder {
public final String topic;
private Serializer keySerializer;
private Serializer valSerializer;
private final StreamsPartitioner partitioner;
private final StreamPartitioner partitioner;
private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
super(name);
this.parents = parents.clone();
this.topic = topic;
@ -245,9 +245,9 @@ public class TopologyBuilder {
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@ -260,7 +260,7 @@ public class TopologyBuilder {
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
* <p>
* The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
* The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@ -274,9 +274,9 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
}
@ -284,7 +284,7 @@ public class TopologyBuilder {
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
* <p>
* The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
* The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@ -302,11 +302,11 @@ public class TopologyBuilder {
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
}
/**
@ -326,10 +326,10 @@ public class TopologyBuilder {
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
*/
public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");

View File

@ -57,7 +57,8 @@ public abstract class AbstractTask {
// create the processor state manager
try {
File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {

View File

@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,7 +72,7 @@ public class RecordCollector {
}
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamsPartitioner<K, V> partitioner) {
StreamPartitioner<K, V> partitioner) {
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = null;

View File

@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class SinkNode<K, V> extends ProcessorNode<K, V> {
private final String topic;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
private final StreamsPartitioner<K, V> partitioner;
private final StreamPartitioner<K, V> partitioner;
private ProcessorContext context;
public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
super(name);
this.topic = topic;

View File

@ -104,6 +104,18 @@ public class StreamThread extends Thread {
private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
private boolean processStandbyRecords = false;
static File makeStateDir(String jobId, String baseDirName) {
File baseDir = new File(baseDirName);
if (!baseDir.exists())
baseDir.mkdir();
File stateDir = new File(baseDir, jobId);
if (!stateDir.exists())
stateDir.mkdir();
return stateDir;
}
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@ -167,8 +179,7 @@ public class StreamThread extends Thread {
this.standbyRecords = new HashMap<>();
// read in task specific config values
this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
this.stateDir.mkdir();
this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@ -452,14 +463,15 @@ public class StreamThread extends Thread {
if (stateDirs != null) {
for (File dir : stateDirs) {
try {
TaskId id = TaskId.parse(dir.getName());
String dirName = dir.getName();
TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1));
// try to acquire the exclusive lock on the state directory
FileLock directoryLock = null;
try {
directoryLock = ProcessorStateManager.lockStateDirectory(dir);
if (directoryLock != null) {
log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs);
Utils.delete(dir);
}
} catch (IOException e) {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.streams.examples;
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.junit.Before;
import org.junit.Test;

View File

@ -33,7 +33,7 @@ import java.util.Random;
import static org.junit.Assert.assertEquals;
public class WindowedStreamsPartitionerTest {
public class WindowedStreamPartitionerTest {
private String topicName = "topic";
@ -59,7 +59,7 @@ public class WindowedStreamsPartitionerTest {
DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer);
WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
for (int k = 0; k < 10; k++) {
Integer key = rand.nextInt();

View File

@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -194,8 +194,8 @@ public class ProcessorTopologyTest {
assertNull(driver.readOutput(topic));
}
protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) {
return new StreamsPartitioner<K, V>() {
protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
return new StreamPartitioner<K, V>() {
@Override
public Integer partition(K key, V value, int numPartitions) {
return partition;

View File

@ -93,8 +93,8 @@ public class StandbyTaskTest {
setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
}
@ -200,7 +200,7 @@ public class StandbyTaskTest {
task.close();
File taskDir = new File(baseDir, taskId.toString());
File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
Map<TopicPartition, Long> offsets = checkpoint.read();
@ -298,7 +298,7 @@ public class StandbyTaskTest {
task.close();
File taskDir = new File(baseDir, taskId.toString());
File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
Map<TopicPartition, Long> offsets = checkpoint.read();

View File

@ -59,8 +59,9 @@ import java.util.UUID;
public class StreamThreadTest {
private String clientId = "clientId";
private UUID processId = UUID.randomUUID();
private final String clientId = "clientId";
private final String jobId = "stream-thread-test";
private final UUID processId = UUID.randomUUID();
private TopicPartition t1p1 = new TopicPartition("topic1", 1);
private TopicPartition t1p2 = new TopicPartition("topic1", 2);
@ -117,8 +118,8 @@ public class StreamThreadTest {
setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
}
};
@ -128,13 +129,14 @@ public class StreamThreadTest {
public boolean committed = false;
public TestStreamTask(TaskId id,
String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
StreamsConfig config) {
super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null);
}
@Override
@ -161,11 +163,11 @@ public class StreamThreadTest {
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@ -264,10 +266,12 @@ public class StreamThreadTest {
StreamsConfig config = new StreamsConfig(props);
File stateDir1 = new File(baseDir, task1.toString());
File stateDir2 = new File(baseDir, task2.toString());
File stateDir3 = new File(baseDir, task3.toString());
File extraDir = new File(baseDir, "X");
File jobDir = new File(baseDir, jobId);
jobDir.mkdir();
File stateDir1 = new File(jobDir, task1.toString());
File stateDir2 = new File(jobDir, task2.toString());
File stateDir3 = new File(jobDir, task3.toString());
File extraDir = new File(jobDir, "X");
stateDir1.mkdir();
stateDir2.mkdir();
stateDir3.mkdir();
@ -281,7 +285,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) {
@Override
public void maybeClean() {
super.maybeClean();
@ -290,7 +294,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@ -403,7 +407,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) {
@Override
public void maybeCommit() {
super.maybeCommit();
@ -412,7 +416,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};

View File

@ -29,7 +29,7 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.test.MockProcessorContext;
@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
}
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
StreamsPartitioner<K1, V1> partitioner) {
StreamPartitioner<K1, V1> partitioner) {
recordFlushed(record.key(), record.value());
}
};

View File

@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@ -130,7 +130,7 @@ public class KStreamTestDriver {
@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamsPartitioner<K, V> partitioner) {
StreamPartitioner<K, V> partitioner) {
// The serialization is skipped.
process(record.topic(), record.key(), record.value());
}