KAFKA-2804: manage changelog topics through ZK in PartitionAssignor

Author: Guozhang Wang <wangguoz@gmail.com>
Author: wangguoz@gmail.com <guozhang@Guozhang-Macbook.local>
Author: Guozhang Wang <guozhang@Guozhang-Macbook.local>

Reviewers: Yasuhiro Matsuda

Closes #579 from guozhangwang/K2804
This commit is contained in:
Guozhang Wang 2015-12-07 15:12:09 -08:00 committed by Gwen Shapira
parent 23f36c5903
commit d05fa0a03b
38 changed files with 899 additions and 356 deletions

View File

@ -57,7 +57,7 @@ do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/stream/build/libs/kafka-streams*.jar;
for file in $base_dir/streams/build/libs/kafka-streams*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

View File

@ -561,6 +561,8 @@ project(':streams') {
compile project(':clients')
compile "$slf4jlog4j"
compile 'org.rocksdb:rocksdbjni:3.10.1'
compile 'com.101tec:zkclient:0.7' // this dependency should be removed after KIP-4
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" // this dependency should be removed after KIP-4
testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives')

View File

@ -129,6 +129,14 @@
<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
<subpackage name="processor">
<subpackage name="internals">
<allow pkg="org.I0Itec.zkclient" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.zookeeper" />
</subpackage>
</subpackage>
</subpackage>
<subpackage name="log4jappender">

View File

@ -73,9 +73,7 @@ public class KafkaStreaming {
private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.streaming";
private final Time time;
private static final String JMX_PREFIX = "kafka.streams";
// container states
private static final int CREATED = 0;
@ -85,29 +83,39 @@ public class KafkaStreaming {
private final StreamThread[] threads;
private String clientId;
private final UUID uuid;
private final Metrics metrics;
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
private final UUID processId;
public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
// create the metrics
this.time = new SystemTime();
this.uuid = UUID.randomUUID();
Time time = new SystemTime();
this.processId = UUID.randomUUID();
String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
if (jobId.length() <= 0)
jobId = "kafka-streams";
String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
Metrics metrics = new Metrics(metricConfig, reporters, time);
this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
for (int i = 0; i < this.threads.length; i++) {
this.threads[i] = new StreamThread(builder, config, this.clientId, this.uuid, this.metrics, this.time);
this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
}
}

View File

@ -42,6 +42,10 @@ public class StreamingConfig extends AbstractConfig {
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store.";
/** <code>zookeeper.connect<code/> */
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
/** <code>commit.interval.ms</code> */
public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
@ -83,8 +87,9 @@ public class StreamingConfig extends AbstractConfig {
public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
/** <code>job.id</code> */
public static final String JOB_ID_CONFIG = "job.id";
public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@ -107,19 +112,30 @@ public class StreamingConfig extends AbstractConfig {
/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
/**
* <code>bootstrap.servers</code>
*/
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
static {
CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
Type.STRING,
"",
Importance.MEDIUM,
StreamingConfig.JOB_ID_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Importance.MEDIUM,
CommonClientConfigs.CLIENT_ID_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG,
Type.STRING,
"",
Importance.HIGH,
StreamingConfig.ZOOKEEPER_CONNECT_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
SYSTEM_TEMP_DIRECTORY,
@ -221,20 +237,27 @@ public class StreamingConfig extends AbstractConfig {
super(CONFIG, props);
}
public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
Map<String, Object> props = getBaseConsumerConfigs();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
return props;
}
public Map<String, Object> getRestoreConsumerConfigs() {
public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
Map<String, Object> props = getBaseConsumerConfigs();
// no group id for a restore consumer
// no need to set group id for a restore consumer
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
return props;
}
@ -248,11 +271,12 @@ public class StreamingConfig extends AbstractConfig {
props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
return props;
}
public Map<String, Object> getProducerConfigs() {
public Map<String, Object> getProducerConfigs(String clientId) {
Map<String, Object> props = this.originals();
// set producer default property values
@ -263,6 +287,8 @@ public class StreamingConfig extends AbstractConfig {
props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
return props;
}

View File

@ -34,7 +34,7 @@ public class KStreamJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

View File

@ -49,7 +49,7 @@ public class ProcessorJob {
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE");
}
@Override
@ -90,8 +90,9 @@ public class ProcessorJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@ -104,8 +105,7 @@ public class ProcessorJob {
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build());
builder.connectProcessorAndStateStores("local-state", "PROCESS");
builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS");
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");

View File

@ -85,7 +85,7 @@ public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
this.context = context;
this.partition = context.id().partition;
SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
context.register(this, restoreFunc);
context.register(this, true, restoreFunc);
for (ValueList<V> valueList : map.values()) {
valueList.clearDirtyValues();

View File

@ -177,7 +177,8 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier =
new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
topology.addStateStore(storeSupplier, name);
// mark this state is non internal hence it is read directly from a user topic
topology.addStateStore(storeSupplier, false, name);
source.materialize();
}
}

View File

@ -29,9 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
public class DefaultPartitionGrouper extends PartitionGrouper {
public class DefaultPartitionGrouper implements PartitionGrouper {
public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
@ -71,3 +71,6 @@ public class DefaultPartitionGrouper extends PartitionGrouper {
}
}

View File

@ -19,39 +19,18 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
import java.util.Map;
import java.util.Set;
public abstract class PartitionGrouper {
protected Map<Integer, Set<String>> topicGroups;
private KafkaStreamingPartitionAssignor partitionAssignor = null;
public interface PartitionGrouper {
/**
* Returns a map of task ids to groups of partitions.
*
* @param metadata
* @param topicGroups The subscribed topic groups
* @param metadata Metadata of the consuming cluster
* @return a map of task ids to groups of partitions
*/
public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
public void topicGroups(Map<Integer, Set<String>> topicGroups) {
this.topicGroups = topicGroups;
}
public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
this.partitionAssignor = partitionAssignor;
}
public Set<TaskId> taskIds(TopicPartition partition) {
return partitionAssignor.taskIds(partition);
}
public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return partitionAssignor.standbyTasks();
}
}
Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata);
}

View File

@ -79,7 +79,7 @@ public interface ProcessorContext {
*
* @param store the storage engine
*/
void register(StateStore store, StateRestoreCallback stateRestoreCallback);
void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
StateStore getStateStore(String name);

View File

@ -45,14 +45,17 @@ import java.util.Set;
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
* that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
* instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
*/
public class TopologyBuilder {
// node factories in a topological order
private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
// state factories
private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
private final Set<String> sourceTopicNames = new HashSet<>();
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@ -60,8 +63,18 @@ public class TopologyBuilder {
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
private Map<Integer, Set<String>> nodeGroups = null;
private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
private Map<String, Set<String>> stateStoreUsers = new HashMap();
private static class StateStoreFactory {
public final Set<String> users;
public final boolean isInternal;
public final StateStoreSupplier supplier;
StateStoreFactory(boolean isInternal, StateStoreSupplier supplier) {
this.isInternal = isInternal;
this.supplier = supplier;
this.users = new HashSet<>();
}
}
private static abstract class NodeFactory {
public final String name;
@ -88,6 +101,7 @@ public class TopologyBuilder {
stateStoreNames.add(stateStoreName);
}
@SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new ProcessorNode(name, supplier.get(), stateStoreNames);
@ -106,6 +120,7 @@ public class TopologyBuilder {
this.valDeserializer = valDeserializer;
}
@SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new SourceNode(name, keyDeserializer, valDeserializer);
@ -125,12 +140,40 @@ public class TopologyBuilder {
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
}
@SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new SinkNode(name, topic, keySerializer, valSerializer);
}
}
public static class TopicsInfo {
public Set<String> sourceTopics;
public Set<String> stateNames;
public TopicsInfo(Set<String> sourceTopics, Set<String> stateNames) {
this.sourceTopics = sourceTopics;
this.stateNames = stateNames;
}
@Override
public boolean equals(Object o) {
if (o instanceof TopicsInfo) {
TopicsInfo other = (TopicsInfo) o;
return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames);
} else {
return false;
}
}
@Override
public int hashCode() {
long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode();
return (int) (n % 0xFFFFFFFFL);
}
}
/**
* Create a new builder.
*/
@ -138,9 +181,9 @@ public class TopologyBuilder {
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
* The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
* {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
* {@link StreamingConfig streaming configuration}.
* The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
* {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@ -158,11 +201,11 @@ public class TopologyBuilder {
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
* should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
* {@link StreamingConfig streaming configuration}
* should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
* {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
* should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
* {@link StreamingConfig streaming configuration}
* should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
@ -186,9 +229,9 @@ public class TopologyBuilder {
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
* {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link StreamingConfig streaming configuration}.
* The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
@ -205,11 +248,11 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
* should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
* {@link StreamingConfig streaming configuration}
* should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
* {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
* should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link StreamingConfig streaming configuration}
* should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
@ -271,12 +314,12 @@ public class TopologyBuilder {
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
if (stateStores.containsKey(supplier.name())) {
public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
if (stateFactories.containsKey(supplier.name())) {
throw new TopologyException("StateStore " + supplier.name() + " is already added.");
}
stateStores.put(supplier.name(), supplier);
stateStoreUsers.put(supplier.name(), new HashSet<String>());
stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier));
if (processorNames != null) {
for (String processorName : processorNames) {
@ -287,6 +330,16 @@ public class TopologyBuilder {
return this;
}
/**
* Adds a state store
*
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
return this.addStateStore(supplier, true, processorNames);
}
/**
* Connects the processor and the state stores
*
@ -305,22 +358,22 @@ public class TopologyBuilder {
}
private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
if (!stateStores.containsKey(stateStoreName))
if (!stateFactories.containsKey(stateStoreName))
throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
if (!nodeFactories.containsKey(processorName))
throw new TopologyException("Processor " + processorName + " is not added yet.");
Set<String> users = stateStoreUsers.get(stateStoreName);
Iterator<String> iter = users.iterator();
StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
Iterator<String> iter = stateStoreFactory.users.iterator();
if (iter.hasNext()) {
String user = iter.next();
nodeGrouper.unite(user, processorName);
}
users.add(processorName);
stateStoreFactory.users.add(processorName);
NodeFactory factory = nodeFactories.get(processorName);
if (factory instanceof ProcessorNodeFactory) {
((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
NodeFactory nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
} else {
throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
@ -332,20 +385,32 @@ public class TopologyBuilder {
*
* @return groups of topic names
*/
public Map<Integer, Set<String>> topicGroups() {
Map<Integer, Set<String>> topicGroups = new HashMap<>();
public Map<Integer, TopicsInfo> topicGroups() {
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
Set<String> topicGroup = new HashSet<>();
Set<String> sourceTopics = new HashSet<>();
Set<String> stateNames = new HashSet<>();
for (String node : entry.getValue()) {
// if the node is a source node, add to the source topics
String[] topics = nodeToTopics.get(node);
if (topics != null)
topicGroup.addAll(Arrays.asList(topics));
sourceTopics.addAll(Arrays.asList(topics));
// if the node is connected to a state, add to the state topics
for (StateStoreFactory stateFactory : stateFactories.values()) {
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
stateNames.add(stateFactory.supplier.name());
}
}
}
topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
topicGroups.put(entry.getKey(), new TopicsInfo(
Collections.unmodifiableSet(sourceTopics),
Collections.unmodifiableSet(stateNames)));
}
return Collections.unmodifiableMap(topicGroups);
@ -431,9 +496,9 @@ public class TopologyBuilder {
/**
* Build the topology for the specified topic group. This is called automatically when passing this builder into the
* {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
* {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
*
* @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
* @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
*/
public ProcessorTopology build(Integer topicGroupId) {
Set<String> nodeGroup;
@ -467,7 +532,7 @@ public class TopologyBuilder {
}
for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
if (!stateStoreMap.containsKey(stateStoreName)) {
stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
}
}
} else if (factory instanceof SourceNodeFactory) {

View File

@ -43,6 +43,7 @@ public abstract class AbstractTask {
protected ProcessorContext processorContext;
protected AbstractTask(TaskId id,
String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
@ -58,7 +59,7 @@ public abstract class AbstractTask {
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
this.stateMgr = new ProcessorStateManager(jobId, id.partition, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}

View File

@ -21,9 +21,11 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@ -32,7 +34,21 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.ZooDefs;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.I0Itec.zkclient.ZkClient;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -46,10 +62,146 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
private StreamThread streamThread;
private int numStandbyReplicas;
private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
private Map<String, Set<TaskId>> stateNameToTaskIds;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
// TODO: the following ZK dependency should be removed after KIP-4
private static final String ZK_TOPIC_PATH = "/brokers/topics";
private static final String ZK_BROKER_PATH = "/brokers/ids";
private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
private ZkClient zkClient;
private class ZKStringSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object data) {
try {
return ((String) data).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new AssertionError(e);
}
}
@Override
public Object deserialize(byte[] bytes) {
try {
if (bytes == null)
return null;
else
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new AssertionError(e);
}
}
}
private List<Integer> getBrokers() {
List<Integer> brokers = new ArrayList<>();
for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
brokers.add(Integer.parseInt(broker));
}
Collections.sort(brokers);
log.debug("Read brokers {} from ZK in partition assignor.", brokers);
return brokers;
}
@SuppressWarnings("unchecked")
private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
if (data == null) return null;
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
});
Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
return partitions;
} catch (IOException e) {
throw new KafkaException(e);
}
}
private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
// we always assign leaders to brokers starting at the first one with replication factor 1
List<Integer> brokers = getBrokers();
Map<Integer, List<Integer>> assignment = new HashMap<>();
for (int i = 0; i < numPartitions; i++) {
assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
}
// try to write to ZK with open ACL
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("version", 1);
dataMap.put("partitions", assignment);
ObjectMapper mapper = new ObjectMapper();
String data = mapper.writeValueAsString(dataMap);
zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
} catch (JsonProcessingException e) {
throw new KafkaException(e);
}
}
private void deleteTopic(String topic) throws ZkNodeExistsException {
log.debug("Deleting topic {} from ZK in partition assignor.", topic);
zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
// we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
List<Integer> brokers = getBrokers();
int startIndex = existingAssignment.size();
Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
for (int i = 0; i < numPartitions; i++) {
newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
}
// try to write to ZK with open ACL
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("version", 1);
dataMap.put("partitions", newAssignment);
ObjectMapper mapper = new ObjectMapper();
String data = mapper.writeValueAsString(dataMap);
zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
} catch (JsonProcessingException e) {
throw new KafkaException(e);
}
}
/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
* since the former needs later's cached metadata while sending subscriptions,
* and the latter needs former's returned assignment when adding tasks.
*/
@Override
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
@ -68,7 +220,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
}
streamThread = (StreamThread) o;
streamThread.partitionGrouper.partitionAssignor(this);
streamThread.partitionAssignor(this);
this.topicGroups = streamThread.builder.topicGroups();
if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
}
@Override
@ -86,7 +243,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
SubscriptionInfo data = new SubscriptionInfo(streamThread.clientUUID, prevTasks, standbyTasks);
SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
return new Subscription(new ArrayList<>(topics), data.encode());
}
@ -112,17 +269,17 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
Set<String> consumers = consumersByClient.get(info.clientUUID);
Set<String> consumers = consumersByClient.get(info.processId);
if (consumers == null) {
consumers = new HashSet<>();
consumersByClient.put(info.clientUUID, consumers);
consumersByClient.put(info.processId, consumers);
}
consumers.add(consumerId);
ClientState<TaskId> state = states.get(info.clientUUID);
ClientState<TaskId> state = states.get(info.processId);
if (state == null) {
state = new ClientState<>();
states.put(info.clientUUID, state);
states.put(info.processId, state);
}
state.prevActiveTasks.addAll(info.prevTasks);
@ -131,21 +288,40 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
state.capacity = state.capacity + 1d;
}
// Get partition groups from the partition grouper
Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);
// get the tasks as partition groups from the partition grouper
Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
}
Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
// add tasks to state topic subscribers
stateNameToTaskIds = new HashMap<>();
for (TaskId task : partitionsForTask.keySet()) {
for (String stateName : topicGroups.get(task.topicGroupId).stateNames) {
Set<TaskId> tasks = stateNameToTaskIds.get(stateName);
if (tasks == null) {
tasks = new HashSet<>();
stateNameToTaskIds.put(stateName, tasks);
}
tasks.add(task);
}
}
// assign tasks to clients
states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
Map<String, Assignment> assignment = new HashMap<>();
for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
UUID uuid = entry.getKey();
UUID processId = entry.getKey();
Set<String> consumers = entry.getValue();
ClientState<TaskId> state = states.get(uuid);
ClientState<TaskId> state = states.get(processId);
ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
final int numActiveTasks = state.activeTasks.size();
for (TaskId id : state.activeTasks) {
taskIds.add(id);
for (TaskId taskId : state.activeTasks) {
taskIds.add(taskId);
}
for (TaskId id : state.assignedTasks) {
if (!state.activeTasks.contains(id))
@ -164,7 +340,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
for (int j = i; j < numTaskIds; j += numConsumers) {
TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
for (TopicPartition partition : partitionGroups.get(taskId)) {
for (TopicPartition partition : partitionsForTask.get(taskId)) {
activePartitions.add(partition);
active.add(taskId);
}
@ -174,7 +350,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
standbyPartitions = new HashSet<>();
standby.put(taskId, standbyPartitions);
}
standbyPartitions.addAll(partitionGroups.get(taskId));
standbyPartitions.addAll(partitionsForTask.get(taskId));
}
}
@ -187,6 +363,63 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
}
}
// if ZK is specified, get the tasks for each state topic and validate the topic partitions
if (zkClient != null) {
log.debug("Starting to validate changelog topics in partition assignor.");
for (Map.Entry<String, Set<TaskId>> entry : stateNameToTaskIds.entrySet()) {
String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = 0;
for (TaskId task : entry.getValue()) {
if (numPartitions < task.partition + 1)
numPartitions = task.partition + 1;
}
boolean topicNotReady = true;
while (topicNotReady) {
Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
// if topic does not exist, create it
if (topicMetadata == null) {
try {
createTopic(topic, numPartitions);
} catch (ZkNodeExistsException e) {
// ignore and continue
}
} else {
if (topicMetadata.size() > numPartitions) {
// else if topic exists with more #.partitions than needed, delete in order to re-create it
try {
deleteTopic(topic);
} catch (ZkNodeExistsException e) {
// ignore and continue
}
} else if (topicMetadata.size() < numPartitions) {
// else if topic exists with less #.partitions than needed, add partitions
try {
addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
} catch (ZkNoNodeException e) {
// ignore and continue
}
}
topicNotReady = false;
}
}
// wait until the topic metadata has been propagated to all brokers
List<PartitionInfo> partitions;
do {
partitions = streamThread.restoreConsumer.partitionsFor(topic);
} while (partitions == null || partitions.size() != numPartitions);
}
log.info("Completed validating changelog topics in partition assignor.");
}
return assignment;
}
@ -220,7 +453,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
this.partitionToTaskIds = partitionToTaskIds;
}
public Set<TaskId> taskIds(TopicPartition partition) {
/* For Test Only */
public Set<TaskId> tasksForState(String stateName) {
return stateNameToTaskIds.get(stateName);
}
public Set<TaskId> tasksForPartition(TopicPartition partition) {
return partitionToTaskIds.get(partition);
}

View File

@ -118,11 +118,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
}
@Override
public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
throw new KafkaException("Can only create state stores during initialization.");
stateMgr.register(store, stateRestoreCallback);
stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}
@Override

View File

@ -19,10 +19,11 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,9 +44,11 @@ public class ProcessorStateManager {
private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
public static final String LOCK_FILE_NAME = ".lock";
private final String jobId;
private final int partition;
private final File baseDir;
private final FileLock directoryLock;
@ -55,9 +58,10 @@ public class ProcessorStateManager {
private final Map<TopicPartition, Long> checkpointedOffsets;
private final Map<TopicPartition, Long> offsetLimits;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
public ProcessorStateManager(String jobId, int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
this.jobId = jobId;
this.partition = partition;
this.baseDir = baseDir;
this.stores = new HashMap<>();
@ -90,6 +94,10 @@ public class ProcessorStateManager {
}
}
public static String storeChangelogTopic(String jobId, String storeName) {
return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}
public static FileLock lockStateDirectory(File stateDir) throws IOException {
File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
@ -104,7 +112,7 @@ public class ProcessorStateManager {
return this.baseDir;
}
public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (store.name().equals(CHECKPOINT_FILE_NAME))
throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
@ -112,44 +120,52 @@ public class ProcessorStateManager {
throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
// check that the underlying change log topic exist or not
if (restoreConsumer.listTopics().containsKey(store.name())) {
boolean partitionNotFound = true;
for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) {
String topic;
if (loggingEnabled)
topic = storeChangelogTopic(this.jobId, store.name());
else topic = store.name();
// block until the partition is ready for this state changelog topic or time has elapsed
boolean partitionNotFound = true;
long startTime = System.currentTimeMillis();
long waitTime = 5000L; // hard-code the value since we should not block after KIP-4
do {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
// ignore
}
for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(topic)) {
if (partitionInfo.partition() == partition) {
partitionNotFound = false;
break;
}
}
} while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
if (partitionNotFound)
throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition);
} else {
throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet");
}
if (partitionNotFound)
throw new KafkaException("Store " + store.name() + "'s change log does not contain partition " + partition);
this.stores.put(store.name(), store);
if (isStandby) {
if (store.persistent())
restoreCallbacks.put(store.name(), stateRestoreCallback);
restoreCallbacks.put(topic, stateRestoreCallback);
} else {
restoreActiveState(store, stateRestoreCallback);
}
}
private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {
if (store == null)
throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");
// ---- try to restore the state from change-log ---- //
// subscribe to the store's partition
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
}
TopicPartition storePartition = new TopicPartition(store.name(), partition);
TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), partition);
restoreConsumer.assign(Collections.singletonList(storePartition));
try {
@ -195,8 +211,8 @@ public class ProcessorStateManager {
Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
String storeName = entry.getKey();
TopicPartition storePartition = new TopicPartition(storeName, partition);
String topicName = entry.getKey();
TopicPartition storePartition = new TopicPartition(topicName, partition);
if (checkpointedOffsets.containsKey(storePartition)) {
partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
@ -212,6 +228,7 @@ public class ProcessorStateManager {
List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
// restore states from changelog records
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
long lastOffset = -1L;
@ -276,7 +293,7 @@ public class ProcessorStateManager {
Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
for (String storeName : stores.keySet()) {
TopicPartition part = new TopicPartition(storeName, partition);
TopicPartition part = new TopicPartition(storeChangelogTopic(jobId, storeName), partition);
// only checkpoint the offset to the offsets file if it is persistent;
if (stores.get(storeName).persistent()) {

View File

@ -110,11 +110,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
}
@Override
public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
throw new KafkaException("Can only create state stores during initialization.");
stateMgr.register(store, stateRestoreCallback);
stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}
@Override

View File

@ -45,19 +45,23 @@ public class StandbyTask extends AbstractTask {
* Create {@link StandbyTask} with its assigned partitions
*
* @param id the ID of this task
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
* @param jobId the ID of the job
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
* @param config the {@link StreamingConfig} specified by the user
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StandbyTask(TaskId id,
String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
StreamingMetrics metrics) {
super(id, partitions, topology, consumer, restoreConsumer, config, true);
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);

View File

@ -61,15 +61,17 @@ public class StreamTask extends AbstractTask implements Punctuator {
* Create {@link StreamTask} with its assigned partitions
*
* @param id the ID of this task
* @param jobId the ID of the job
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param producer the instance of {@link Producer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamingConfig} specified by the user
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StreamTask(TaskId id,
String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
@ -77,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
StreamingMetrics metrics) {
super(id, partitions, topology, consumer, restoreConsumer, config, false);
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);

View File

@ -70,7 +70,9 @@ public class StreamThread extends Thread {
private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
public final PartitionGrouper partitionGrouper;
public final UUID clientUUID;
public final String jobId;
public final String clientId;
public final UUID processId;
protected final StreamingConfig config;
protected final TopologyBuilder builder;
@ -83,7 +85,6 @@ public class StreamThread extends Thread {
private final Map<TaskId, StreamTask> activeTasks;
private final Map<TaskId, StandbyTask> standbyTasks;
private final Set<TaskId> prevTasks;
private final String clientId;
private final Time time;
private final File stateDir;
private final long pollTimeMs;
@ -92,6 +93,8 @@ public class StreamThread extends Thread {
private final long totalRecordsToProcess;
private final StreamingMetricsImpl sensors;
private KafkaStreamingPartitionAssignor partitionAssignor = null;
private long lastClean;
private long lastCommit;
private long recordsProcessed;
@ -118,11 +121,12 @@ public class StreamThread extends Thread {
public StreamThread(TopologyBuilder builder,
StreamingConfig config,
String jobId,
String clientId,
UUID clientUUID,
UUID processId,
Metrics metrics,
Time time) throws Exception {
this(builder, config, null , null, null, clientId, clientUUID, metrics, time);
this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
}
StreamThread(TopologyBuilder builder,
@ -130,19 +134,20 @@ public class StreamThread extends Thread {
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
String jobId,
String clientId,
UUID clientUUID,
UUID processId,
Metrics metrics,
Time time) throws Exception {
super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
this.jobId = jobId;
this.config = config;
this.builder = builder;
this.sourceTopics = builder.sourceTopics();
this.clientId = clientId;
this.clientUUID = clientUUID;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
this.partitionGrouper.topicGroups(builder.topicGroups());
// set the producer and consumer clients
this.producer = (producer != null) ? producer : createProducer();
@ -175,23 +180,27 @@ public class StreamThread extends Thread {
this.running = new AtomicBoolean(true);
}
public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
this.partitionAssignor = partitionAssignor;
}
private Producer<byte[], byte[]> createProducer() {
log.info("Creating producer client for stream thread [" + this.getName() + "]");
return new KafkaProducer<>(config.getProducerConfigs(),
return new KafkaProducer<>(config.getProducerConfigs(this.clientId),
new ByteArraySerializer(),
new ByteArraySerializer());
}
private Consumer<byte[], byte[]> createConsumer() {
log.info("Creating consumer client for stream thread [" + this.getName() + "]");
return new KafkaConsumer<>(config.getConsumerConfigs(this),
return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
private Consumer<byte[], byte[]> createRestoreConsumer() {
log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
return new KafkaConsumer<>(config.getRestoreConsumerConfigs(),
return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
@ -516,14 +525,17 @@ public class StreamThread extends Thread {
ProcessorTopology topology = builder.build(id.topicGroupId);
return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
private void addStreamTasks(Collection<TopicPartition> assignment) {
if (partitionAssignor == null)
throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
for (TopicPartition partition : assignment) {
Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
for (TaskId taskId : taskIds) {
Set<TopicPartition> partitions = partitionsForTask.get(taskId);
if (partitions == null) {
@ -574,17 +586,20 @@ public class StreamThread extends Thread {
ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStoreSuppliers().isEmpty()) {
return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
} else {
return null;
}
}
private void addStandbyTasks() {
if (partitionAssignor == null)
throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
// create the standby tasks
for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet()) {
TaskId taskId = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
StandbyTask task = createStandbyTask(taskId, partitions);

View File

@ -30,30 +30,32 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
private static final int CURRENT_VERSION = 1;
public final int version;
public final UUID clientUUID;
public final UUID processId;
public final Set<TaskId> prevTasks;
public final Set<TaskId> standbyTasks;
public SubscriptionInfo(UUID clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
this(1, clientUUID, prevTasks, standbyTasks);
public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
this(CURRENT_VERSION, processId, prevTasks, standbyTasks);
}
private SubscriptionInfo(int version, UUID clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
this.version = version;
this.clientUUID = clientUUID;
this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
}
public ByteBuffer encode() {
if (version == 1) {
ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
if (version == CURRENT_VERSION) {
ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
// version
buf.putInt(1);
buf.putInt(version);
// encode client UUID
buf.putLong(clientUUID.getMostSignificantBits());
buf.putLong(clientUUID.getLeastSignificantBits());
buf.putLong(processId.getMostSignificantBits());
buf.putLong(processId.getLeastSignificantBits());
// encode ids of previously running tasks
buf.putInt(prevTasks.size());
for (TaskId id : prevTasks) {
@ -81,9 +83,9 @@ public class SubscriptionInfo {
// Decode version
int version = data.getInt();
if (version == 1) {
if (version == CURRENT_VERSION) {
// Decode client UUID
UUID clientUUID = new UUID(data.getLong(), data.getLong());
UUID processId = new UUID(data.getLong(), data.getLong());
// Decode previously active tasks
Set<TaskId> prevTasks = new HashSet<>();
int numPrevs = data.getInt();
@ -98,7 +100,7 @@ public class SubscriptionInfo {
standbyTasks.add(TaskId.readFrom(data));
}
return new SubscriptionInfo(version, clientUUID, prevTasks, standbyTasks);
return new SubscriptionInfo(version, processId, prevTasks, standbyTasks);
} else {
TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
@ -109,7 +111,7 @@ public class SubscriptionInfo {
@Override
public int hashCode() {
return version ^ clientUUID.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
}
@Override
@ -117,7 +119,7 @@ public class SubscriptionInfo {
if (o instanceof SubscriptionInfo) {
SubscriptionInfo other = (SubscriptionInfo) o;
return this.version == other.version &&
this.clientUUID.equals(other.clientUUID) &&
this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
this.standbyTasks.equals(other.standbyTasks);
} else {

View File

@ -88,7 +88,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
final Deserializer<V> valDeserializer = serialization.valueDeserializer();
context.register(this, new StateRestoreCallback() {
context.register(this, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(name, key),

View File

@ -56,7 +56,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private Serdes<K, V> serdes;
private ProcessorContext context;
private String dbName;
private String dirName;
private RocksDB db;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -37,13 +38,11 @@ public class StreamingConfigTest {
private Properties props = new Properties();
private StreamingConfig streamingConfig;
private StreamThread streamThreadPlaceHolder = null;
private StreamThread streamThreadPlaceHolder;
@Before
public void setUp() {
props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
props.put("group.id", "test-consumer-group");
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@ -53,18 +52,24 @@ public class StreamingConfigTest {
streamingConfig = new StreamingConfig(props);
}
@Test
public void testGetProducerConfigs() throws Exception {
Map<String, Object> returnedProps = streamingConfig.getProducerConfigs("client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
}
@Test
public void testGetConsumerConfigs() throws Exception {
Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder);
assertEquals(returnedProps.get("group.id"), "test-consumer-group");
Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
}
@Test
public void testGetRestoreConsumerConfigs() throws Exception {
Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs();
assertNull(returnedProps.get("group.id"));
Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs("client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import static org.apache.kafka.common.utils.Utils.mkSet;
import org.junit.Test;
@ -43,42 +44,39 @@ public class DefaultPartitionGrouperTest {
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
@Test
public void testGrouping() {
PartitionGrouper grouper = new DefaultPartitionGrouper();
int topicGroupId;
Map<TaskId, Set<TopicPartition>> expected;
Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask;
Map<Integer, Set<String>> topicGroups;
topicGroups = new HashMap<>();
topicGroups.put(0, mkSet("topic1"));
topicGroups.put(1, mkSet("topic2"));
grouper.topicGroups(topicGroups);
expected = new HashMap<>();
expectedPartitionsForTask = new HashMap<>();
topicGroupId = 0;
expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
topicGroupId++;
expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
assertEquals(expected, grouper.partitionGroups(metadata));
assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
topicGroups = new HashMap<>();
topicGroups.put(0, mkSet("topic1", "topic2"));
grouper.topicGroups(topicGroups);
expected = new HashMap<>();
expectedPartitionsForTask = new HashMap<>();
topicGroupId = 0;
expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
assertEquals(expected, grouper.partitionGroups(metadata));
assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
}
}

View File

@ -19,11 +19,13 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -208,12 +210,12 @@ public class TopologyBuilderTest {
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
Map<Integer, Set<String>> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
expectedTopicGroups.put(2, mkSet("topic-5"));
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet()));
expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet()));
expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet()));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
@ -235,18 +237,23 @@ public class TopologyBuilderTest {
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
Map<Integer, Set<String>> topicGroups = builder.topicGroups();
builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
builder.addStateStore(supplier);
builder.connectProcessorAndStateStores("processor-5", "store-3");
Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
expectedTopicGroups.put(2, mkSet("topic-5"));
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("store-1")));
expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), mkSet("store-2")));
expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), mkSet("store-3")));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);

View File

@ -35,9 +35,9 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -59,7 +59,10 @@ public class KafkaStreamingPartitionAssignorTest {
private TopicPartition t2p0 = new TopicPartition("topic2", 0);
private TopicPartition t2p1 = new TopicPartition("topic2", 1);
private TopicPartition t2p2 = new TopicPartition("topic2", 2);
private TopicPartition t2p3 = new TopicPartition("topic2", 3);
private TopicPartition t3p0 = new TopicPartition("topic3", 0);
private TopicPartition t3p1 = new TopicPartition("topic3", 1);
private TopicPartition t3p2 = new TopicPartition("topic3", 2);
private TopicPartition t3p3 = new TopicPartition("topic3", 3);
private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
@ -69,27 +72,15 @@ public class KafkaStreamingPartitionAssignorTest {
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0])
new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
private ByteBuffer subscriptionUserData() {
UUID uuid = UUID.randomUUID();
ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + 4);
// version
buf.putInt(1);
// encode client clientUUID
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
// previously running tasks
buf.putInt(0);
// cached tasks
buf.putInt(0);
buf.rewind();
return buf;
}
private final TaskId task0 = new TaskId(0, 0);
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
@ -131,8 +122,9 @@ public class KafkaStreamingPartitionAssignorTest {
new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
UUID uuid = UUID.randomUUID();
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()) {
String clientId = "client-id";
UUID processId = UUID.randomUUID();
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
@Override
public Set<TaskId> prevTasks() {
return prevTasks;
@ -144,7 +136,7 @@ public class KafkaStreamingPartitionAssignorTest {
};
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread));
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
@ -154,12 +146,12 @@ public class KafkaStreamingPartitionAssignorTest {
Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
standbyTasks.removeAll(prevTasks);
SubscriptionInfo info = new SubscriptionInfo(uuid, prevTasks, standbyTasks);
SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
assertEquals(info.encode(), subscription.userData());
}
@Test
public void testAssign() throws Exception {
public void testAssignBasic() throws Exception {
StreamingConfig config = new StreamingConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
@ -182,11 +174,13 @@ public class KafkaStreamingPartitionAssignorTest {
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10));
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
@ -199,7 +193,6 @@ public class KafkaStreamingPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assigned partitions
assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
@ -216,8 +209,7 @@ public class KafkaStreamingPartitionAssignorTest {
AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks);
// check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
// the third consumer
AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
@ -227,7 +219,135 @@ public class KafkaStreamingPartitionAssignorTest {
assertEquals(allTasks, new HashSet<>(allActiveTasks));
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
assertEquals(allTasks, allActiveTasks);
}
@Test
public void testAssignWithNewTasks() throws Exception {
StreamingConfig config = new StreamingConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
// assuming that previous tasks do not have topic3
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
// also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
// then later ones will be re-assigned to other hosts due to load balancing
Set<TaskId> allActiveTasks = new HashSet<>();
Set<TopicPartition> allPartitions = new HashSet<>();
AssignmentInfo info;
info = AssignmentInfo.decode(assignments.get("consumer10").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer10").partitions());
info = AssignmentInfo.decode(assignments.get("consumer11").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer11").partitions());
info = AssignmentInfo.decode(assignments.get("consumer20").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer20").partitions());
assertEquals(allTasks, allActiveTasks);
assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
}
@Test
public void testAssignWithStates() throws Exception {
StreamingConfig config = new StreamingConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
List<String> topics = Utils.mkList("topic1", "topic2");
TaskId task00 = new TaskId(0, 0);
TaskId task01 = new TaskId(0, 1);
TaskId task02 = new TaskId(0, 2);
TaskId task10 = new TaskId(1, 0);
TaskId task11 = new TaskId(1, 1);
TaskId task12 = new TaskId(1, 2);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
assertEquals(2, assignments.get("consumer10").partitions().size());
assertEquals(2, assignments.get("consumer11").partitions().size());
assertEquals(2, assignments.get("consumer20").partitions().size());
assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
// check tasks for state topics
assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
}
@Test
@ -257,11 +377,13 @@ public class KafkaStreamingPartitionAssignorTest {
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10));
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
@ -298,10 +420,10 @@ public class KafkaStreamingPartitionAssignorTest {
// all task ids are in the active tasks and also in the standby tasks
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
assertEquals(allTasks, allActiveTasks);
assertEquals(3, allStandbyTasks.size());
assertEquals(allTasks, new HashSet<>(allStandbyTasks));
assertEquals(allTasks, allStandbyTasks);
}
private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
@ -354,17 +476,20 @@ public class KafkaStreamingPartitionAssignorTest {
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopicPartition t2p3 = new TopicPartition("topic2", 3);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
UUID uuid = UUID.randomUUID();
String client1 = "client1";
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime());
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread));
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
List<TaskId> activeTaskList = Utils.mkList(task0, task3);
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@ -375,8 +500,8 @@ public class KafkaStreamingPartitionAssignorTest {
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
partitionAssignor.onAssignment(assignment);
assertEquals(Utils.mkSet(task0), partitionAssignor.taskIds(t1p0));
assertEquals(Utils.mkSet(task3), partitionAssignor.taskIds(t2p3));
assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@ -178,14 +179,21 @@ public class ProcessorStateManagerTest {
}
}
private final String jobId = "test-job";
private final String stateDir = "test";
private final String persistentStoreName = "persistentStore";
private final String nonPersistentStoreName = "nonPersistentStore";
private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName);
private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName);
@Test
public void testLockStateDirectory() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
FileLock lock;
// the state manager locks the directory
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
try {
// this should not get the lock
@ -208,15 +216,15 @@ public class ProcessorStateManagerTest {
}
}
@Test(expected = IllegalStateException.class)
@Test(expected = KafkaException.class)
public void testNoTopic() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
try {
stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
} finally {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
@ -227,41 +235,42 @@ public class ProcessorStateManagerTest {
@Test
public void testRegisterPersistentStore() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
long lastCheckpointedOffset = 10L;
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
));
TopicPartition partition = new TopicPartition("persistentStore", 2);
TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); // persistent store
try {
restoreConsumer.reset();
ArrayList<Integer> expectedKeys = new ArrayList<>();
long offset = -1L;
long offset;
for (int i = 1; i <= 3; i++) {
offset = (long) i;
int key = i * 10;
expectedKeys.add(key);
restoreConsumer.bufferRecord(
new ConsumerRecord<>("persistentStore", 2, offset, key, 0)
new ConsumerRecord<>(persistentStoreTopicName, 2, offset, key, 0)
);
}
stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
assertEquals(new TopicPartition(persistentStoreTopicName, 2), restoreConsumer.assignedPartition);
assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
assertFalse(restoreConsumer.seekToBeginingCalled);
assertTrue(restoreConsumer.seekToEndCalled);
@ -278,24 +287,26 @@ public class ProcessorStateManagerTest {
@Test
public void testRegisterNonPersistentStore() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
long lastCheckpointedOffset = 10L;
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
));
TopicPartition partition = new TopicPartition("persistentStore", 2);
TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
@ -306,13 +317,13 @@ public class ProcessorStateManagerTest {
int key = i;
expectedKeys.add(i);
restoreConsumer.bufferRecord(
new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0)
new ConsumerRecord<>(nonPersistentStoreTopicName, 2, offset, key, 0)
);
}
stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
assertEquals(new TopicPartition(nonPersistentStoreTopicName, 2), restoreConsumer.assignedPartition);
assertEquals(0L, restoreConsumer.seekOffset);
assertTrue(restoreConsumer.seekToBeginingCalled);
assertTrue(restoreConsumer.seekToEndCalled);
@ -328,37 +339,44 @@ public class ProcessorStateManagerTest {
@Test
public void testChangeLogOffsets() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
long lastCheckpointedOffset = 10L;
String storeName1 = "store1";
String storeName2 = "store2";
String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition("store1", 0), lastCheckpointedOffset));
checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
restoreConsumer.updatePartitions("store1", Utils.mkList(
new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0])
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, baseDir, restoreConsumer, true); // standby
restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
));
restoreConsumer.updatePartitions("store2", Utils.mkList(
new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0])
restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList(
new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])
));
TopicPartition partition1 = new TopicPartition("store1", 0);
TopicPartition partition2 = new TopicPartition("store2", 0);
TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(partition1, 13L);
endOffsets.put(partition2, 17L);
restoreConsumer.updateEndOffsets(endOffsets);
MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true);
MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true);
MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
ProcessorStateManager stateMgr = new ProcessorStateManager(0, baseDir, restoreConsumer, true); // standby
try {
restoreConsumer.reset();
stateMgr.register(store1, store1.stateRestoreCallback);
stateMgr.register(store2, store2.stateRestoreCallback);
stateMgr.register(store1, true, store1.stateRestoreCallback);
stateMgr.register(store2, true, store2.stateRestoreCallback);
Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
@ -379,21 +397,22 @@ public class ProcessorStateManagerTest {
@Test
public void testGetStore() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
restoreConsumer.updatePartitions("mockStore", Utils.mkList(
new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
));
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
try {
stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
assertNull(stateMgr.getStore("noSuchStore"));
assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
} finally {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@ -405,7 +424,7 @@ public class ProcessorStateManagerTest {
@Test
public void testClose() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
File baseDir = Files.createTempDirectory(stateDir).toFile();
File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME);
try {
// write an empty checkpoint file
@ -413,32 +432,33 @@ public class ProcessorStateManagerTest {
oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
));
restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
));
// set up ack'ed offsets
HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L);
ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L);
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
try {
// make sure the checkpoint file is deleted
assertFalse(checkpointFile.exists());
restoreConsumer.reset();
stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
restoreConsumer.reset();
stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
} finally {
// close the state manager with the ack'ed offsets
stateMgr.close(ackedOffsets);
@ -455,7 +475,7 @@ public class ProcessorStateManagerTest {
OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
assertEquals(1, checkpointedOffsets.size());
assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1)));
assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
} finally {
Utils.delete(baseDir);
}

View File

@ -53,16 +53,22 @@ public class StandbyTaskTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final TopicPartition partition1 = new TopicPartition("store1", 1);
private final TopicPartition partition2 = new TopicPartition("store2", 1);
private final String jobId = "test-job";
private final String storeName1 = "store1";
private final String storeName2 = "store2";
private final String storeChangelogTopicName1 = jobId + "-" + storeName1 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
private final String storeChangelogTopicName2 = jobId + "-" + storeName2 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
private final Set<TopicPartition> topicPartitions = Collections.emptySet();
private final ProcessorTopology topology = new ProcessorTopology(
Collections.<ProcessorNode>emptyList(),
Collections.<String, SourceNode>emptyMap(),
Utils.<StateStoreSupplier>mkList(
new MockStateStoreSupplier(partition1.topic(), false),
new MockStateStoreSupplier(partition2.topic(), true)
new MockStateStoreSupplier(storeName1, false),
new MockStateStoreSupplier(storeName2, true)
)
);
@ -91,25 +97,31 @@ public class StandbyTaskTest {
@Before
public void setup() {
restoreStateConsumer.reset();
restoreStateConsumer.updatePartitions("store1", Utils.mkList(
new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("store1", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("store1", 2, Node.noNode(), new Node[0], new Node[0])
restoreStateConsumer.updatePartitions(storeChangelogTopicName1, Utils.mkList(
new PartitionInfo(storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])
));
restoreStateConsumer.updatePartitions("store2", Utils.mkList(
new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("store2", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("store2", 2, Node.noNode(), new Node[0], new Node[0])
System.out.println("added " + storeChangelogTopicName1);
restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList(
new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
));
System.out.println("added " + storeChangelogTopicName2);
}
@Test
public void testStorePartitions() throws Exception {
System.out.println("STARTED");
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
@ -124,7 +136,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@ -143,7 +155,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@ -168,9 +180,9 @@ public class StandbyTaskTest {
StandbyContextImpl context = (StandbyContextImpl) task.context();
MockStateStoreSupplier.MockStateStore store1 =
(MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition1.topic());
(MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName1);
MockStateStoreSupplier.MockStateStore store2 =
(MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition2.topic());
(MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName2);
assertEquals(Collections.emptyList(), store1.keys);
assertEquals(Utils.mkList(1, 2, 3), store2.keys);

View File

@ -103,7 +103,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(new TaskId(0, 0), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@ -154,7 +154,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(new TaskId(1, 1), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),

View File

@ -59,7 +59,8 @@ import java.util.UUID;
public class StreamThreadTest {
private UUID uuid = UUID.randomUUID();
private String clientId = "clientId";
private UUID processId = UUID.randomUUID();
private TopicPartition t1p1 = new TopicPartition("topic1", 1);
private TopicPartition t1p2 = new TopicPartition("topic1", 2);
@ -90,7 +91,7 @@ public class StreamThreadTest {
ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + 4);
// version
buf.putInt(1);
// encode client clientUUID
// encode client processId
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
// previously running tasks
@ -132,7 +133,7 @@ public class StreamThreadTest {
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config) {
super(id, partitions, topology, consumer, producer, restoreConsumer, config, null);
super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
}
@Override
@ -159,7 +160,7 @@ public class StreamThreadTest {
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()) {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
@ -279,7 +280,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), mockTime) {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) {
@Override
public void maybeClean() {
super.maybeClean();
@ -401,7 +402,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), mockTime) {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) {
@Override
public void maybeCommit() {
super.maybeCommit();
@ -471,7 +472,7 @@ public class StreamThreadTest {
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread));
partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
Map<String, PartitionAssignor.Assignment> assignments =
partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));

View File

@ -31,13 +31,14 @@ public class SubscriptionInfoTest {
@Test
public void testEncodeDecode() {
UUID clientUUID = UUID.randomUUID();
UUID processId = UUID.randomUUID();
Set<TaskId> activeTasks =
new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
Set<TaskId> standbyTasks =
new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
SubscriptionInfo info = new SubscriptionInfo(clientUUID, activeTasks, standbyTasks);
SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks);
SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(info, decoded);

View File

@ -269,7 +269,7 @@ public class KeyValueStoreTestDriver<K, V> {
}
@Override
public void register(StateStore store, StateRestoreCallback func) {
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
storeMap.put(store.name(), store);
restoreEntries(func);
}

View File

@ -127,7 +127,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
public void register(StateStore store, StateRestoreCallback func) {
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
storeMap.put(store.name(), store);
}

View File

@ -66,7 +66,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
@Override
public void init(ProcessorContext context) {
context.register(this, stateRestoreCallback);
context.register(this, true, stateRestoreCallback);
initialized = true;
}

View File

@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
@ -124,6 +125,8 @@ public class ProcessorTopologyTestDriver {
private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
private final String jobId = "test-driver-job";
private final TaskId id;
private final ProcessorTopology topology;
private final StreamTask task;
@ -158,6 +161,7 @@ public class ProcessorTopologyTestDriver {
}
task = new StreamTask(id,
jobId,
partitionsByTopic.values(),
topology,
consumer,
@ -324,12 +328,12 @@ public class ProcessorTopologyTestDriver {
};
// For each store name ...
for (String storeName : storeNames) {
String topicName = storeName;
String topicName = jobId + "-" + storeName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
// Set up the restore-state topic ...
// consumer.subscribe(new TopicPartition(topicName, 1));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
List<PartitionInfo> partitionInfos = new ArrayList<>();
partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null));
consumer.updatePartitions(topicName, partitionInfos);
consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
}

View File

@ -49,7 +49,7 @@ public class UnlimitedWindowDef<K, V> implements WindowSupplier<K, V> {
@Override
public void init(ProcessorContext context) {
context.register(this, null);
context.register(this, true, null);
}
@Override