diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index fcf442b8cfa..2551338e094 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -57,7 +57,7 @@ do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/stream/build/libs/kafka-streams*.jar; +for file in $base_dir/streams/build/libs/kafka-streams*.jar; do CLASSPATH=$CLASSPATH:$file done diff --git a/build.gradle b/build.gradle index e24279ea6ba..224f550403c 100644 --- a/build.gradle +++ b/build.gradle @@ -561,6 +561,8 @@ project(':streams') { compile project(':clients') compile "$slf4jlog4j" compile 'org.rocksdb:rocksdbjni:3.10.1' + compile 'com.101tec:zkclient:0.7' // this dependency should be removed after KIP-4 + compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" // this dependency should be removed after KIP-4 testCompile "$junit" testCompile project(path: ':clients', configuration: 'archives') diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e221dce633b..a65a2dc3ad0 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -129,6 +129,14 @@ + + + + + + + + diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java index fc1fdae2f1b..0d99739959a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java @@ -73,9 +73,7 @@ public class KafkaStreaming { private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class); private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1); - private static final String JMX_PREFIX = "kafka.streaming"; - - private final Time time; + private static final String JMX_PREFIX = "kafka.streams"; // container states private static final int CREATED = 0; @@ -85,29 +83,39 @@ public class KafkaStreaming { private final StreamThread[] threads; - private String clientId; - private final UUID uuid; - private final Metrics metrics; + // processId is expected to be unique across JVMs and to be used + // in userData of the subscription request to allow assignor be aware + // of the co-location of stream thread's consumers. It is for internal + // usage only and should not be exposed to users at all. + private final UUID processId; public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception { // create the metrics - this.time = new SystemTime(); - this.uuid = UUID.randomUUID(); + Time time = new SystemTime(); + + this.processId = UUID.randomUUID(); + + String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG); + if (jobId.length() <= 0) + jobId = "kafka-streams"; + + String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG); + if (clientId.length() <= 0) + clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement(); + + List reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(JMX_PREFIX)); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); - clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) - clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement(); - List reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(JMX_PREFIX)); - this.metrics = new Metrics(metricConfig, reporters, time); + + Metrics metrics = new Metrics(metricConfig, reporters, time); this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)]; for (int i = 0; i < this.threads.length; i++) { - this.threads[i] = new StreamThread(builder, config, this.clientId, this.uuid, this.metrics, this.time); + this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java index 437afd8b81e..e89d03086da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java @@ -42,6 +42,10 @@ public class StreamingConfig extends AbstractConfig { public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; + /** zookeeper.connect */ + public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; + private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management."; + /** commit.interval.ms */ public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; @@ -83,8 +87,9 @@ public class StreamingConfig extends AbstractConfig { public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the PartitionGrouper interface."; - /** client.id */ - public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + /** job.id */ + public static final String JOB_ID_CONFIG = "job.id"; + public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; /** key.serializer */ public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; @@ -107,19 +112,30 @@ public class StreamingConfig extends AbstractConfig { /** metric.reporters */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; - /** - * bootstrap.servers - */ + /** bootstrap.servers */ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + /** client.id */ + public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir"); static { - CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG, + CONFIG = new ConfigDef().define(JOB_ID_CONFIG, + Type.STRING, + "", + Importance.MEDIUM, + StreamingConfig.JOB_ID_DOC) + .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) + .define(ZOOKEEPER_CONNECT_CONFIG, + Type.STRING, + "", + Importance.HIGH, + StreamingConfig.ZOOKEEPER_CONNECT_DOC) .define(STATE_DIR_CONFIG, Type.STRING, SYSTEM_TEMP_DIRECTORY, @@ -221,20 +237,27 @@ public class StreamingConfig extends AbstractConfig { super(CONFIG, props); } - public Map getConsumerConfigs(StreamThread streamThread) { + public Map getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) { Map props = getBaseConsumerConfigs(); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG)); - props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName()); + + props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); + return props; } - public Map getRestoreConsumerConfigs() { + public Map getRestoreConsumerConfigs(String clientId) { Map props = getBaseConsumerConfigs(); - // no group id for a restore consumer + // no need to set group id for a restore consumer props.remove(ConsumerConfig.GROUP_ID_CONFIG); + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer"); + return props; } @@ -248,11 +271,12 @@ public class StreamingConfig extends AbstractConfig { props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG); props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG); props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG); return props; } - public Map getProducerConfigs() { + public Map getProducerConfigs(String clientId) { Map props = this.originals(); // set producer default property values @@ -263,6 +287,8 @@ public class StreamingConfig extends AbstractConfig { props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG); props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); + return props; } diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java index 87368c1c465..819bd687b18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java @@ -34,7 +34,7 @@ public class KStreamJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job"); + props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream"); props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java index 882c7ed0fbd..2d0b79fa075 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java @@ -49,7 +49,7 @@ public class ProcessorJob { public void init(ProcessorContext context) { this.context = context; this.context.schedule(1000); - this.kvStore = (KeyValueStore) context.getStateStore("local-state"); + this.kvStore = (KeyValueStore) context.getStateStore("LOCAL-STATE"); } @Override @@ -90,8 +90,9 @@ public class ProcessorJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job"); + props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor"); props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -104,8 +105,7 @@ public class ProcessorJob { builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE"); - builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build()); - builder.connectProcessorAndStateStores("local-state", "PROCESS"); + builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS"); builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java index 0cf969f744f..80e548f94c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java @@ -85,7 +85,7 @@ public class SlidingWindowSupplier implements WindowSupplier { this.context = context; this.partition = context.id().partition; SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback(); - context.register(this, restoreFunc); + context.register(this, true, restoreFunc); for (ValueList valueList : map.values()) { valueList.clearDirtyValues(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 5b2b0313502..47c9b09ee6c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -177,7 +177,8 @@ public class KTableImpl implements KTable { if (!source.isMaterialized()) { StateStoreSupplier storeSupplier = new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null); - topology.addStateStore(storeSupplier, name); + // mark this state is non internal hence it is read directly from a user topic + topology.addStateStore(storeSupplier, false, name); source.materialize(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 7d2188a2f2c..923a2173a99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -29,9 +29,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class DefaultPartitionGrouper extends PartitionGrouper { +public class DefaultPartitionGrouper implements PartitionGrouper { - public Map> partitionGroups(Cluster metadata) { + public Map> partitionGroups(Map> topicGroups, Cluster metadata) { Map> groups = new HashMap<>(); for (Map.Entry> entry : topicGroups.entrySet()) { @@ -71,3 +71,6 @@ public class DefaultPartitionGrouper extends PartitionGrouper { } } + + + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index 187c4ce11f4..a40a1c42aa4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -19,39 +19,18 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor; import java.util.Map; import java.util.Set; -public abstract class PartitionGrouper { - - protected Map> topicGroups; - - private KafkaStreamingPartitionAssignor partitionAssignor = null; +public interface PartitionGrouper { /** * Returns a map of task ids to groups of partitions. * - * @param metadata + * @param topicGroups The subscribed topic groups + * @param metadata Metadata of the consuming cluster * @return a map of task ids to groups of partitions */ - public abstract Map> partitionGroups(Cluster metadata); - - public void topicGroups(Map> topicGroups) { - this.topicGroups = topicGroups; - } - - public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) { - this.partitionAssignor = partitionAssignor; - } - - public Set taskIds(TopicPartition partition) { - return partitionAssignor.taskIds(partition); - } - - public Map> standbyTasks() { - return partitionAssignor.standbyTasks(); - } - -} + Map> partitionGroups(Map> topicGroups, Cluster metadata); +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 88ac64e29b0..fa19ed798a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -79,7 +79,7 @@ public interface ProcessorContext { * * @param store the storage engine */ - void register(StateStore store, StateRestoreCallback stateRestoreCallback); + void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback); StateStore getStateStore(String name); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 021a47ffd20..3cfb22bd936 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -45,14 +45,17 @@ import java.util.Set; * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes, * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink} * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you - * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance - * that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}. + * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming} + * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}. */ public class TopologyBuilder { // node factories in a topological order private final LinkedHashMap nodeFactories = new LinkedHashMap<>(); + // state factories + private final Map stateFactories = new HashMap<>(); + private final Set sourceTopicNames = new HashSet<>(); private final QuickUnion nodeGrouper = new QuickUnion<>(); @@ -60,8 +63,18 @@ public class TopologyBuilder { private final HashMap nodeToTopics = new HashMap<>(); private Map> nodeGroups = null; - private Map stateStores = new HashMap<>(); - private Map> stateStoreUsers = new HashMap(); + private static class StateStoreFactory { + public final Set users; + + public final boolean isInternal; + public final StateStoreSupplier supplier; + + StateStoreFactory(boolean isInternal, StateStoreSupplier supplier) { + this.isInternal = isInternal; + this.supplier = supplier; + this.users = new HashSet<>(); + } + } private static abstract class NodeFactory { public final String name; @@ -88,6 +101,7 @@ public class TopologyBuilder { stateStoreNames.add(stateStoreName); } + @SuppressWarnings("unchecked") @Override public ProcessorNode build() { return new ProcessorNode(name, supplier.get(), stateStoreNames); @@ -106,6 +120,7 @@ public class TopologyBuilder { this.valDeserializer = valDeserializer; } + @SuppressWarnings("unchecked") @Override public ProcessorNode build() { return new SourceNode(name, keyDeserializer, valDeserializer); @@ -125,12 +140,40 @@ public class TopologyBuilder { this.keySerializer = keySerializer; this.valSerializer = valSerializer; } + + @SuppressWarnings("unchecked") @Override public ProcessorNode build() { return new SinkNode(name, topic, keySerializer, valSerializer); } } + public static class TopicsInfo { + public Set sourceTopics; + public Set stateNames; + + public TopicsInfo(Set sourceTopics, Set stateNames) { + this.sourceTopics = sourceTopics; + this.stateNames = stateNames; + } + + @Override + public boolean equals(Object o) { + if (o instanceof TopicsInfo) { + TopicsInfo other = (TopicsInfo) o; + return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames); + } else { + return false; + } + } + + @Override + public int hashCode() { + long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode(); + return (int) (n % 0xFFFFFFFFL); + } + } + /** * Create a new builder. */ @@ -138,9 +181,9 @@ public class TopologyBuilder { /** * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes. - * The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and - * {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamingConfig streaming configuration}. + * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}. * * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. @@ -158,11 +201,11 @@ public class TopologyBuilder { * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source - * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the - * {@link StreamingConfig streaming configuration} + * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the + * {@link org.apache.kafka.streams.StreamingConfig streaming configuration} * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source - * should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamingConfig streaming configuration} + * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamingConfig streaming configuration} * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ @@ -186,9 +229,9 @@ public class TopologyBuilder { /** * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. - * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and - * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the - * {@link StreamingConfig streaming configuration}. + * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and + * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the + * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}. * * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its messages @@ -205,11 +248,11 @@ public class TopologyBuilder { * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its messages * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink - * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the - * {@link StreamingConfig streaming configuration} + * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the + * {@link org.apache.kafka.streams.StreamingConfig streaming configuration} * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink - * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the - * {@link StreamingConfig streaming configuration} + * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the + * {@link org.apache.kafka.streams.StreamingConfig streaming configuration} * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null @@ -271,12 +314,12 @@ public class TopologyBuilder { * @param supplier the supplier used to obtain this state store {@link StateStore} instance * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { - if (stateStores.containsKey(supplier.name())) { + public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) { + if (stateFactories.containsKey(supplier.name())) { throw new TopologyException("StateStore " + supplier.name() + " is already added."); } - stateStores.put(supplier.name(), supplier); - stateStoreUsers.put(supplier.name(), new HashSet()); + + stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier)); if (processorNames != null) { for (String processorName : processorNames) { @@ -287,6 +330,16 @@ public class TopologyBuilder { return this; } + /** + * Adds a state store + * + * @param supplier the supplier used to obtain this state store {@link StateStore} instance + * @return this builder instance so methods can be chained together; never null + */ + public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { + return this.addStateStore(supplier, true, processorNames); + } + /** * Connects the processor and the state stores * @@ -305,22 +358,22 @@ public class TopologyBuilder { } private void connectProcessorAndStateStore(String processorName, String stateStoreName) { - if (!stateStores.containsKey(stateStoreName)) + if (!stateFactories.containsKey(stateStoreName)) throw new TopologyException("StateStore " + stateStoreName + " is not added yet."); if (!nodeFactories.containsKey(processorName)) throw new TopologyException("Processor " + processorName + " is not added yet."); - Set users = stateStoreUsers.get(stateStoreName); - Iterator iter = users.iterator(); + StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName); + Iterator iter = stateStoreFactory.users.iterator(); if (iter.hasNext()) { String user = iter.next(); nodeGrouper.unite(user, processorName); } - users.add(processorName); + stateStoreFactory.users.add(processorName); - NodeFactory factory = nodeFactories.get(processorName); - if (factory instanceof ProcessorNodeFactory) { - ((ProcessorNodeFactory) factory).addStateStore(stateStoreName); + NodeFactory nodeFactory = nodeFactories.get(processorName); + if (nodeFactory instanceof ProcessorNodeFactory) { + ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName); } else { throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node."); } @@ -332,20 +385,32 @@ public class TopologyBuilder { * * @return groups of topic names */ - public Map> topicGroups() { - Map> topicGroups = new HashMap<>(); + public Map topicGroups() { + Map topicGroups = new HashMap<>(); if (nodeGroups == null) nodeGroups = makeNodeGroups(); for (Map.Entry> entry : nodeGroups.entrySet()) { - Set topicGroup = new HashSet<>(); + Set sourceTopics = new HashSet<>(); + Set stateNames = new HashSet<>(); for (String node : entry.getValue()) { + // if the node is a source node, add to the source topics String[] topics = nodeToTopics.get(node); if (topics != null) - topicGroup.addAll(Arrays.asList(topics)); + sourceTopics.addAll(Arrays.asList(topics)); + + // if the node is connected to a state, add to the state topics + for (StateStoreFactory stateFactory : stateFactories.values()) { + + if (stateFactory.isInternal && stateFactory.users.contains(node)) { + stateNames.add(stateFactory.supplier.name()); + } + } } - topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup)); + topicGroups.put(entry.getKey(), new TopicsInfo( + Collections.unmodifiableSet(sourceTopics), + Collections.unmodifiableSet(stateNames))); } return Collections.unmodifiableMap(topicGroups); @@ -431,9 +496,9 @@ public class TopologyBuilder { /** * Build the topology for the specified topic group. This is called automatically when passing this builder into the - * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor. + * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor. * - * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig) + * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig) */ public ProcessorTopology build(Integer topicGroupId) { Set nodeGroup; @@ -467,7 +532,7 @@ public class TopologyBuilder { } for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { if (!stateStoreMap.containsKey(stateStoreName)) { - stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName)); + stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier); } } } else if (factory instanceof SourceNodeFactory) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index e1b4d62fdc1..b3255bbb2a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -43,6 +43,7 @@ public abstract class AbstractTask { protected ProcessorContext processorContext; protected AbstractTask(TaskId id, + String jobId, Collection partitions, ProcessorTopology topology, Consumer consumer, @@ -58,7 +59,7 @@ public abstract class AbstractTask { try { File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString()); // if partitions is null, this is a standby task - this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby); + this.stateMgr = new ProcessorStateManager(jobId, id.partition, stateFile, restoreConsumer, isStandby); } catch (IOException e) { throw new KafkaException("Error while creating the state manager", e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java index 54d5567fc1e..29c67f284bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java @@ -21,9 +21,11 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.ClientState; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; @@ -32,7 +34,21 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.zookeeper.ZooDefs; + +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.I0Itec.zkclient.ZkClient; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,10 +62,146 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class); private StreamThread streamThread; + private int numStandbyReplicas; + private Map topicGroups; private Map> partitionToTaskIds; + private Map> stateNameToTaskIds; private Map> standbyTasks; + + // TODO: the following ZK dependency should be removed after KIP-4 + private static final String ZK_TOPIC_PATH = "/brokers/topics"; + private static final String ZK_BROKER_PATH = "/brokers/ids"; + private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics"; + + private ZkClient zkClient; + + private class ZKStringSerializer implements ZkSerializer { + + @Override + public byte[] serialize(Object data) { + try { + return ((String) data).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new AssertionError(e); + } + } + + @Override + public Object deserialize(byte[] bytes) { + try { + if (bytes == null) + return null; + else + return new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new AssertionError(e); + } + } + } + + private List getBrokers() { + List brokers = new ArrayList<>(); + for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) { + brokers.add(Integer.parseInt(broker)); + } + Collections.sort(brokers); + + log.debug("Read brokers {} from ZK in partition assignor.", brokers); + + return brokers; + } + + @SuppressWarnings("unchecked") + private Map> getTopicMetadata(String topic) { + String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true); + + if (data == null) return null; + + try { + ObjectMapper mapper = new ObjectMapper(); + + Map dataMap = mapper.readValue(data, new TypeReference>() { + + }); + + Map> partitions = (Map>) dataMap.get("partitions"); + + log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic); + + return partitions; + } catch (IOException e) { + throw new KafkaException(e); + } + } + + private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException { + log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions); + + // we always assign leaders to brokers starting at the first one with replication factor 1 + List brokers = getBrokers(); + + Map> assignment = new HashMap<>(); + for (int i = 0; i < numPartitions; i++) { + assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size()))); + } + + // try to write to ZK with open ACL + try { + Map dataMap = new HashMap<>(); + dataMap.put("version", 1); + dataMap.put("partitions", assignment); + + ObjectMapper mapper = new ObjectMapper(); + String data = mapper.writeValueAsString(dataMap); + + zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } catch (JsonProcessingException e) { + throw new KafkaException(e); + } + } + + private void deleteTopic(String topic) throws ZkNodeExistsException { + log.debug("Deleting topic {} from ZK in partition assignor.", topic); + + zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + private void addPartitions(String topic, int numPartitions, Map> existingAssignment) { + log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment); + + // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1 + List brokers = getBrokers(); + + int startIndex = existingAssignment.size(); + + Map> newAssignment = new HashMap<>(existingAssignment); + + for (int i = 0; i < numPartitions; i++) { + newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size())); + } + + // try to write to ZK with open ACL + try { + Map dataMap = new HashMap<>(); + dataMap.put("version", 1); + dataMap.put("partitions", newAssignment); + + ObjectMapper mapper = new ObjectMapper(); + String data = mapper.writeValueAsString(dataMap); + + zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data); + } catch (JsonProcessingException e) { + throw new KafkaException(e); + } + } + + /** + * We need to have the PartitionAssignor and its StreamThread to be mutually accessible + * since the former needs later's cached metadata while sending subscriptions, + * and the latter needs former's returned assignment when adding tasks. + */ @Override public void configure(Map configs) { numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG); @@ -68,7 +220,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi } streamThread = (StreamThread) o; - streamThread.partitionGrouper.partitionAssignor(this); + streamThread.partitionAssignor(this); + + this.topicGroups = streamThread.builder.topicGroups(); + + if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG)) + zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer()); } @Override @@ -86,7 +243,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi Set prevTasks = streamThread.prevTasks(); Set standbyTasks = streamThread.cachedTasks(); standbyTasks.removeAll(prevTasks); - SubscriptionInfo data = new SubscriptionInfo(streamThread.clientUUID, prevTasks, standbyTasks); + SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks); return new Subscription(new ArrayList<>(topics), data.encode()); } @@ -112,17 +269,17 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); - Set consumers = consumersByClient.get(info.clientUUID); + Set consumers = consumersByClient.get(info.processId); if (consumers == null) { consumers = new HashSet<>(); - consumersByClient.put(info.clientUUID, consumers); + consumersByClient.put(info.processId, consumers); } consumers.add(consumerId); - ClientState state = states.get(info.clientUUID); + ClientState state = states.get(info.processId); if (state == null) { state = new ClientState<>(); - states.put(info.clientUUID, state); + states.put(info.processId, state); } state.prevActiveTasks.addAll(info.prevTasks); @@ -131,21 +288,40 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi state.capacity = state.capacity + 1d; } - // Get partition groups from the partition grouper - Map> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata); + // get the tasks as partition groups from the partition grouper + Map> sourceTopicGroups = new HashMap<>(); + for (Map.Entry entry : topicGroups.entrySet()) { + sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics); + } + Map> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata); - states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas); + // add tasks to state topic subscribers + stateNameToTaskIds = new HashMap<>(); + for (TaskId task : partitionsForTask.keySet()) { + for (String stateName : topicGroups.get(task.topicGroupId).stateNames) { + Set tasks = stateNameToTaskIds.get(stateName); + if (tasks == null) { + tasks = new HashSet<>(); + stateNameToTaskIds.put(stateName, tasks); + } + + tasks.add(task); + } + } + + // assign tasks to clients + states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas); Map assignment = new HashMap<>(); for (Map.Entry> entry : consumersByClient.entrySet()) { - UUID uuid = entry.getKey(); + UUID processId = entry.getKey(); Set consumers = entry.getValue(); - ClientState state = states.get(uuid); + ClientState state = states.get(processId); ArrayList taskIds = new ArrayList<>(state.assignedTasks.size()); final int numActiveTasks = state.activeTasks.size(); - for (TaskId id : state.activeTasks) { - taskIds.add(id); + for (TaskId taskId : state.activeTasks) { + taskIds.add(taskId); } for (TaskId id : state.assignedTasks) { if (!state.activeTasks.contains(id)) @@ -164,7 +340,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi for (int j = i; j < numTaskIds; j += numConsumers) { TaskId taskId = taskIds.get(j); if (j < numActiveTasks) { - for (TopicPartition partition : partitionGroups.get(taskId)) { + for (TopicPartition partition : partitionsForTask.get(taskId)) { activePartitions.add(partition); active.add(taskId); } @@ -174,7 +350,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi standbyPartitions = new HashSet<>(); standby.put(taskId, standbyPartitions); } - standbyPartitions.addAll(partitionGroups.get(taskId)); + standbyPartitions.addAll(partitionsForTask.get(taskId)); } } @@ -187,6 +363,63 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi } } + // if ZK is specified, get the tasks for each state topic and validate the topic partitions + if (zkClient != null) { + log.debug("Starting to validate changelog topics in partition assignor."); + + for (Map.Entry> entry : stateNameToTaskIds.entrySet()) { + String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; + + // the expected number of partitions is the max value of TaskId.partition + 1 + int numPartitions = 0; + for (TaskId task : entry.getValue()) { + if (numPartitions < task.partition + 1) + numPartitions = task.partition + 1; + } + + boolean topicNotReady = true; + + while (topicNotReady) { + Map> topicMetadata = getTopicMetadata(topic); + + // if topic does not exist, create it + if (topicMetadata == null) { + try { + createTopic(topic, numPartitions); + } catch (ZkNodeExistsException e) { + // ignore and continue + } + } else { + if (topicMetadata.size() > numPartitions) { + // else if topic exists with more #.partitions than needed, delete in order to re-create it + try { + deleteTopic(topic); + } catch (ZkNodeExistsException e) { + // ignore and continue + } + } else if (topicMetadata.size() < numPartitions) { + // else if topic exists with less #.partitions than needed, add partitions + try { + addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata); + } catch (ZkNoNodeException e) { + // ignore and continue + } + } + + topicNotReady = false; + } + } + + // wait until the topic metadata has been propagated to all brokers + List partitions; + do { + partitions = streamThread.restoreConsumer.partitionsFor(topic); + } while (partitions == null || partitions.size() != numPartitions); + } + + log.info("Completed validating changelog topics in partition assignor."); + } + return assignment; } @@ -220,7 +453,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi this.partitionToTaskIds = partitionToTaskIds; } - public Set taskIds(TopicPartition partition) { + /* For Test Only */ + public Set tasksForState(String stateName) { + return stateNameToTaskIds.get(stateName); + } + + public Set tasksForPartition(TopicPartition partition) { return partitionToTaskIds.get(partition); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 1321cc519f1..3429df38708 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -118,11 +118,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S } @Override - public void register(StateStore store, StateRestoreCallback stateRestoreCallback) { + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { if (initialized) throw new KafkaException("Can only create state stores during initialization."); - stateMgr.register(store, stateRestoreCallback); + stateMgr.register(store, loggingEnabled, stateRestoreCallback); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 4cff02dd7de..579d245fbae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -19,10 +19,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.OffsetCheckpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +44,11 @@ public class ProcessorStateManager { private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class); + public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; public static final String CHECKPOINT_FILE_NAME = ".checkpoint"; public static final String LOCK_FILE_NAME = ".lock"; + private final String jobId; private final int partition; private final File baseDir; private final FileLock directoryLock; @@ -55,9 +58,10 @@ public class ProcessorStateManager { private final Map checkpointedOffsets; private final Map offsetLimits; private final boolean isStandby; - private final Map restoreCallbacks; // used for standby tasks + private final Map restoreCallbacks; // used for standby tasks, keyed by state topic name - public ProcessorStateManager(int partition, File baseDir, Consumer restoreConsumer, boolean isStandby) throws IOException { + public ProcessorStateManager(String jobId, int partition, File baseDir, Consumer restoreConsumer, boolean isStandby) throws IOException { + this.jobId = jobId; this.partition = partition; this.baseDir = baseDir; this.stores = new HashMap<>(); @@ -90,6 +94,10 @@ public class ProcessorStateManager { } } + public static String storeChangelogTopic(String jobId, String storeName) { + return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; + } + public static FileLock lockStateDirectory(File stateDir) throws IOException { File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); @@ -104,7 +112,7 @@ public class ProcessorStateManager { return this.baseDir; } - public void register(StateStore store, StateRestoreCallback stateRestoreCallback) { + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { if (store.name().equals(CHECKPOINT_FILE_NAME)) throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME); @@ -112,44 +120,52 @@ public class ProcessorStateManager { throw new IllegalArgumentException("Store " + store.name() + " has already been registered."); // check that the underlying change log topic exist or not - if (restoreConsumer.listTopics().containsKey(store.name())) { - boolean partitionNotFound = true; - for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) { + String topic; + if (loggingEnabled) + topic = storeChangelogTopic(this.jobId, store.name()); + else topic = store.name(); + + // block until the partition is ready for this state changelog topic or time has elapsed + boolean partitionNotFound = true; + long startTime = System.currentTimeMillis(); + long waitTime = 5000L; // hard-code the value since we should not block after KIP-4 + + do { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + // ignore + } + + for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(topic)) { if (partitionInfo.partition() == partition) { partitionNotFound = false; break; } } + } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime); - if (partitionNotFound) - throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition); - - } else { - throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet"); - } + if (partitionNotFound) + throw new KafkaException("Store " + store.name() + "'s change log does not contain partition " + partition); this.stores.put(store.name(), store); if (isStandby) { if (store.persistent()) - restoreCallbacks.put(store.name(), stateRestoreCallback); + restoreCallbacks.put(topic, stateRestoreCallback); } else { restoreActiveState(store, stateRestoreCallback); } } private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) { - - if (store == null) - throw new IllegalArgumentException("Store " + store.name() + " has not been registered."); - // ---- try to restore the state from change-log ---- // // subscribe to the store's partition if (!restoreConsumer.subscription().isEmpty()) { throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand"); } - TopicPartition storePartition = new TopicPartition(store.name(), partition); + TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), partition); restoreConsumer.assign(Collections.singletonList(storePartition)); try { @@ -195,8 +211,8 @@ public class ProcessorStateManager { Map partitionsAndOffsets = new HashMap<>(); for (Map.Entry entry : restoreCallbacks.entrySet()) { - String storeName = entry.getKey(); - TopicPartition storePartition = new TopicPartition(storeName, partition); + String topicName = entry.getKey(); + TopicPartition storePartition = new TopicPartition(topicName, partition); if (checkpointedOffsets.containsKey(storePartition)) { partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition)); @@ -212,6 +228,7 @@ public class ProcessorStateManager { List> remainingRecords = null; // restore states from changelog records + StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic()); long lastOffset = -1L; @@ -276,7 +293,7 @@ public class ProcessorStateManager { Map checkpointOffsets = new HashMap<>(); for (String storeName : stores.keySet()) { - TopicPartition part = new TopicPartition(storeName, partition); + TopicPartition part = new TopicPartition(storeChangelogTopic(jobId, storeName), partition); // only checkpoint the offset to the offsets file if it is persistent; if (stores.get(storeName).persistent()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index ea95300b288..e0583e38e45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -110,11 +110,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup } @Override - public void register(StateStore store, StateRestoreCallback stateRestoreCallback) { + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { if (initialized) throw new KafkaException("Can only create state stores during initialization."); - stateMgr.register(store, stateRestoreCallback); + stateMgr.register(store, loggingEnabled, stateRestoreCallback); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index d0d8493d5cb..4cc4ea47740 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -45,19 +45,23 @@ public class StandbyTask extends AbstractTask { * Create {@link StandbyTask} with its assigned partitions * * @param id the ID of this task - * @param restoreConsumer the instance of {@link Consumer} used when restoring state + * @param jobId the ID of the job + * @param partitions the collection of assigned {@link TopicPartition} * @param topology the instance of {@link ProcessorTopology} + * @param consumer the instance of {@link Consumer} + * @param restoreConsumer the instance of {@link Consumer} used when restoring state * @param config the {@link StreamingConfig} specified by the user * @param metrics the {@link StreamingMetrics} created by the thread */ public StandbyTask(TaskId id, + String jobId, Collection partitions, ProcessorTopology topology, Consumer consumer, Consumer restoreConsumer, StreamingConfig config, StreamingMetrics metrics) { - super(id, partitions, topology, consumer, restoreConsumer, config, true); + super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true); // initialize the topology with its own context this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 24c450e46ee..2e58ad59f8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -61,15 +61,17 @@ public class StreamTask extends AbstractTask implements Punctuator { * Create {@link StreamTask} with its assigned partitions * * @param id the ID of this task + * @param jobId the ID of the job + * @param partitions the collection of assigned {@link TopicPartition} + * @param topology the instance of {@link ProcessorTopology} * @param consumer the instance of {@link Consumer} * @param producer the instance of {@link Producer} * @param restoreConsumer the instance of {@link Consumer} used when restoring state - * @param partitions the collection of assigned {@link TopicPartition} - * @param topology the instance of {@link ProcessorTopology} * @param config the {@link StreamingConfig} specified by the user * @param metrics the {@link StreamingMetrics} created by the thread */ public StreamTask(TaskId id, + String jobId, Collection partitions, ProcessorTopology topology, Consumer consumer, @@ -77,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator { Consumer restoreConsumer, StreamingConfig config, StreamingMetrics metrics) { - super(id, partitions, topology, consumer, restoreConsumer, config, false); + super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c77a027df6e..4d1ef439465 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -70,7 +70,9 @@ public class StreamThread extends Thread { private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1); public final PartitionGrouper partitionGrouper; - public final UUID clientUUID; + public final String jobId; + public final String clientId; + public final UUID processId; protected final StreamingConfig config; protected final TopologyBuilder builder; @@ -83,7 +85,6 @@ public class StreamThread extends Thread { private final Map activeTasks; private final Map standbyTasks; private final Set prevTasks; - private final String clientId; private final Time time; private final File stateDir; private final long pollTimeMs; @@ -92,6 +93,8 @@ public class StreamThread extends Thread { private final long totalRecordsToProcess; private final StreamingMetricsImpl sensors; + private KafkaStreamingPartitionAssignor partitionAssignor = null; + private long lastClean; private long lastCommit; private long recordsProcessed; @@ -118,11 +121,12 @@ public class StreamThread extends Thread { public StreamThread(TopologyBuilder builder, StreamingConfig config, + String jobId, String clientId, - UUID clientUUID, + UUID processId, Metrics metrics, Time time) throws Exception { - this(builder, config, null , null, null, clientId, clientUUID, metrics, time); + this(builder, config, null , null, null, jobId, clientId, processId, metrics, time); } StreamThread(TopologyBuilder builder, @@ -130,19 +134,20 @@ public class StreamThread extends Thread { Producer producer, Consumer consumer, Consumer restoreConsumer, + String jobId, String clientId, - UUID clientUUID, + UUID processId, Metrics metrics, Time time) throws Exception { super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement()); + this.jobId = jobId; this.config = config; this.builder = builder; this.sourceTopics = builder.sourceTopics(); this.clientId = clientId; - this.clientUUID = clientUUID; + this.processId = processId; this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); - this.partitionGrouper.topicGroups(builder.topicGroups()); // set the producer and consumer clients this.producer = (producer != null) ? producer : createProducer(); @@ -175,23 +180,27 @@ public class StreamThread extends Thread { this.running = new AtomicBoolean(true); } + public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) { + this.partitionAssignor = partitionAssignor; + } + private Producer createProducer() { log.info("Creating producer client for stream thread [" + this.getName() + "]"); - return new KafkaProducer<>(config.getProducerConfigs(), + return new KafkaProducer<>(config.getProducerConfigs(this.clientId), new ByteArraySerializer(), new ByteArraySerializer()); } private Consumer createConsumer() { log.info("Creating consumer client for stream thread [" + this.getName() + "]"); - return new KafkaConsumer<>(config.getConsumerConfigs(this), + return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId), new ByteArrayDeserializer(), new ByteArrayDeserializer()); } private Consumer createRestoreConsumer() { log.info("Creating restore consumer client for stream thread [" + this.getName() + "]"); - return new KafkaConsumer<>(config.getRestoreConsumerConfigs(), + return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId), new ByteArrayDeserializer(), new ByteArrayDeserializer()); } @@ -516,14 +525,17 @@ public class StreamThread extends Thread { ProcessorTopology topology = builder.build(id.topicGroupId); - return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors); + return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors); } private void addStreamTasks(Collection assignment) { + if (partitionAssignor == null) + throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen."); + HashMap> partitionsForTask = new HashMap<>(); for (TopicPartition partition : assignment) { - Set taskIds = partitionGrouper.taskIds(partition); + Set taskIds = partitionAssignor.tasksForPartition(partition); for (TaskId taskId : taskIds) { Set partitions = partitionsForTask.get(taskId); if (partitions == null) { @@ -574,17 +586,20 @@ public class StreamThread extends Thread { ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStoreSuppliers().isEmpty()) { - return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors); + return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors); } else { return null; } } private void addStandbyTasks() { + if (partitionAssignor == null) + throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen."); + Map checkpointedOffsets = new HashMap<>(); // create the standby tasks - for (Map.Entry> entry : partitionGrouper.standbyTasks().entrySet()) { + for (Map.Entry> entry : partitionAssignor.standbyTasks().entrySet()) { TaskId taskId = entry.getKey(); Set partitions = entry.getValue(); StandbyTask task = createStandbyTask(taskId, partitions); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 54042b9ce8a..43009a1a7a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -30,30 +30,32 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); + private static final int CURRENT_VERSION = 1; + public final int version; - public final UUID clientUUID; + public final UUID processId; public final Set prevTasks; public final Set standbyTasks; - public SubscriptionInfo(UUID clientUUID, Set prevTasks, Set standbyTasks) { - this(1, clientUUID, prevTasks, standbyTasks); + public SubscriptionInfo(UUID processId, Set prevTasks, Set standbyTasks) { + this(CURRENT_VERSION, processId, prevTasks, standbyTasks); } - private SubscriptionInfo(int version, UUID clientUUID, Set prevTasks, Set standbyTasks) { + private SubscriptionInfo(int version, UUID processId, Set prevTasks, Set standbyTasks) { this.version = version; - this.clientUUID = clientUUID; + this.processId = processId; this.prevTasks = prevTasks; this.standbyTasks = standbyTasks; } public ByteBuffer encode() { - if (version == 1) { - ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8); + if (version == CURRENT_VERSION) { + ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8); // version - buf.putInt(1); + buf.putInt(version); // encode client UUID - buf.putLong(clientUUID.getMostSignificantBits()); - buf.putLong(clientUUID.getLeastSignificantBits()); + buf.putLong(processId.getMostSignificantBits()); + buf.putLong(processId.getLeastSignificantBits()); // encode ids of previously running tasks buf.putInt(prevTasks.size()); for (TaskId id : prevTasks) { @@ -81,9 +83,9 @@ public class SubscriptionInfo { // Decode version int version = data.getInt(); - if (version == 1) { + if (version == CURRENT_VERSION) { // Decode client UUID - UUID clientUUID = new UUID(data.getLong(), data.getLong()); + UUID processId = new UUID(data.getLong(), data.getLong()); // Decode previously active tasks Set prevTasks = new HashSet<>(); int numPrevs = data.getInt(); @@ -98,7 +100,7 @@ public class SubscriptionInfo { standbyTasks.add(TaskId.readFrom(data)); } - return new SubscriptionInfo(version, clientUUID, prevTasks, standbyTasks); + return new SubscriptionInfo(version, processId, prevTasks, standbyTasks); } else { TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version); @@ -109,7 +111,7 @@ public class SubscriptionInfo { @Override public int hashCode() { - return version ^ clientUUID.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); + return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); } @Override @@ -117,7 +119,7 @@ public class SubscriptionInfo { if (o instanceof SubscriptionInfo) { SubscriptionInfo other = (SubscriptionInfo) o; return this.version == other.version && - this.clientUUID.equals(other.clientUUID) && + this.processId.equals(other.processId) && this.prevTasks.equals(other.prevTasks) && this.standbyTasks.equals(other.standbyTasks); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java index 8aed6b8403a..d75e7e6dc0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java @@ -88,7 +88,7 @@ public class MeteredKeyValueStore implements KeyValueStore { final Deserializer keyDeserializer = serialization.keyDeserializer(); final Deserializer valDeserializer = serialization.valueDeserializer(); - context.register(this, new StateRestoreCallback() { + context.register(this, loggingEnabled, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { inner.put(keyDeserializer.deserialize(name, key), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java index 40ca9f59d50..029d72fe1cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java @@ -56,7 +56,6 @@ public class RocksDBStore implements KeyValueStore { private Serdes serdes; private ProcessorContext context; - private String dbName; private String dirName; private RocksDB db; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java index a491e4acc66..3b3fc9b9c2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -37,13 +38,11 @@ public class StreamingConfigTest { private Properties props = new Properties(); private StreamingConfig streamingConfig; - private StreamThread streamThreadPlaceHolder = null; + private StreamThread streamThreadPlaceHolder; @Before public void setUp() { - props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job"); - props.put("group.id", "test-consumer-group"); props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -53,18 +52,24 @@ public class StreamingConfigTest { streamingConfig = new StreamingConfig(props); } - + @Test + public void testGetProducerConfigs() throws Exception { + Map returnedProps = streamingConfig.getProducerConfigs("client"); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer"); + } @Test public void testGetConsumerConfigs() throws Exception { - Map returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder); - assertEquals(returnedProps.get("group.id"), "test-consumer-group"); + Map returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client"); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); + assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job"); } @Test public void testGetRestoreConsumerConfigs() throws Exception { - Map returnedProps = streamingConfig.getRestoreConsumerConfigs(); - assertNull(returnedProps.get("group.id")); + Map returnedProps = streamingConfig.getRestoreConsumerConfigs("client"); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer"); + assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java index 2a5ca9b1d09..c11d0c13449 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; + import static org.apache.kafka.common.utils.Utils.mkSet; import org.junit.Test; @@ -43,42 +44,39 @@ public class DefaultPartitionGrouperTest { new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); + private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.emptySet()); @Test public void testGrouping() { PartitionGrouper grouper = new DefaultPartitionGrouper(); int topicGroupId; - Map> expected; + Map> expectedPartitionsForTask; Map> topicGroups; topicGroups = new HashMap<>(); topicGroups.put(0, mkSet("topic1")); topicGroups.put(1, mkSet("topic2")); - grouper.topicGroups(topicGroups); - expected = new HashMap<>(); + expectedPartitionsForTask = new HashMap<>(); topicGroupId = 0; - expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0))); - expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1))); - expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2))); topicGroupId++; - expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0))); - expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1))); - assertEquals(expected, grouper.partitionGroups(metadata)); + assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata)); topicGroups = new HashMap<>(); topicGroups.put(0, mkSet("topic1", "topic2")); - grouper.topicGroups(topicGroups); - expected = new HashMap<>(); + expectedPartitionsForTask = new HashMap<>(); topicGroupId = 0; - expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))); - expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); - expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); + expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2))); - assertEquals(expected, grouper.partitionGroups(metadata)); + assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata)); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index f6924ade85a..af0b3c96e43 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -19,11 +19,13 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Test; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -208,12 +210,12 @@ public class TopologyBuilderTest { builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - Map> topicGroups = builder.topicGroups(); + Map topicGroups = builder.topicGroups(); - Map> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2")); - expectedTopicGroups.put(1, mkSet("topic-3", "topic-4")); - expectedTopicGroups.put(2, mkSet("topic-5")); + Map expectedTopicGroups = new HashMap<>(); + expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptySet())); + expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.emptySet())); + expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.emptySet())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -235,18 +237,23 @@ public class TopologyBuilderTest { builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2"); - builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2"); + builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2"); builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3"); builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4"); - builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4"); + builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4"); - Map> topicGroups = builder.topicGroups(); + builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5"); + StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false); + builder.addStateStore(supplier); + builder.connectProcessorAndStateStores("processor-5", "store-3"); - Map> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2")); - expectedTopicGroups.put(1, mkSet("topic-3", "topic-4")); - expectedTopicGroups.put(2, mkSet("topic-5")); + Map topicGroups = builder.topicGroups(); + + Map expectedTopicGroups = new HashMap<>(); + expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("store-1"))); + expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), mkSet("store-2"))); + expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), mkSet("store-3"))); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java index 43ffa7bd09b..92d7b6aa123 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java @@ -35,9 +35,9 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Test; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -59,7 +59,10 @@ public class KafkaStreamingPartitionAssignorTest { private TopicPartition t2p0 = new TopicPartition("topic2", 0); private TopicPartition t2p1 = new TopicPartition("topic2", 1); private TopicPartition t2p2 = new TopicPartition("topic2", 2); - private TopicPartition t2p3 = new TopicPartition("topic2", 3); + private TopicPartition t3p0 = new TopicPartition("topic3", 0); + private TopicPartition t3p1 = new TopicPartition("topic3", 1); + private TopicPartition t3p2 = new TopicPartition("topic3", 2); + private TopicPartition t3p3 = new TopicPartition("topic3", 3); private Set allTopics = Utils.mkSet("topic1", "topic2"); @@ -69,27 +72,15 @@ public class KafkaStreamingPartitionAssignorTest { new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]) + new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) ); private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); - private ByteBuffer subscriptionUserData() { - UUID uuid = UUID.randomUUID(); - ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + 4); - // version - buf.putInt(1); - // encode client clientUUID - buf.putLong(uuid.getMostSignificantBits()); - buf.putLong(uuid.getLeastSignificantBits()); - // previously running tasks - buf.putInt(0); - // cached tasks - buf.putInt(0); - buf.rewind(); - return buf; - } - private final TaskId task0 = new TaskId(0, 0); private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); @@ -131,8 +122,9 @@ public class KafkaStreamingPartitionAssignorTest { new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); - UUID uuid = UUID.randomUUID(); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()) { + String clientId = "client-id"; + UUID processId = UUID.randomUUID(); + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) { @Override public Set prevTasks() { return prevTasks; @@ -144,7 +136,7 @@ public class KafkaStreamingPartitionAssignorTest { }; KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread)); + partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId)); PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); @@ -154,12 +146,12 @@ public class KafkaStreamingPartitionAssignorTest { Set standbyTasks = new HashSet<>(cachedTasks); standbyTasks.removeAll(prevTasks); - SubscriptionInfo info = new SubscriptionInfo(uuid, prevTasks, standbyTasks); + SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks); assertEquals(info.encode(), subscription.userData()); } @Test - public void testAssign() throws Exception { + public void testAssignBasic() throws Exception { StreamingConfig config = new StreamingConfig(configProps()); MockProducer producer = new MockProducer<>(true, serializer, serializer); @@ -182,11 +174,13 @@ public class KafkaStreamingPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; - StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime()); + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10)); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", @@ -199,7 +193,6 @@ public class KafkaStreamingPartitionAssignorTest { Map assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partitions - assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); @@ -216,8 +209,7 @@ public class KafkaStreamingPartitionAssignorTest { AssignmentInfo info11 = checkAssignment(assignments.get("consumer11")); allActiveTasks.addAll(info11.activeTasks); - // check active tasks assigned to the first client - assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); + assertEquals(Utils.mkSet(task0, task1), allActiveTasks); // the third consumer AssignmentInfo info20 = checkAssignment(assignments.get("consumer20")); @@ -227,7 +219,135 @@ public class KafkaStreamingPartitionAssignorTest { assertEquals(allTasks, new HashSet<>(allActiveTasks)); assertEquals(3, allActiveTasks.size()); - assertEquals(allTasks, new HashSet<>(allActiveTasks)); + assertEquals(allTasks, allActiveTasks); + } + + @Test + public void testAssignWithNewTasks() throws Exception { + StreamingConfig config = new StreamingConfig(configProps()); + + MockProducer producer = new MockProducer<>(true, serializer, serializer); + MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addSource("source3", "topic3"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); + List topics = Utils.mkList("topic1", "topic2", "topic3"); + Set allTasks = Utils.mkSet(task0, task1, task2, task3); + + // assuming that previous tasks do not have topic3 + final Set prevTasks10 = Utils.mkSet(task0); + final Set prevTasks11 = Utils.mkSet(task1); + final Set prevTasks20 = Utils.mkSet(task2); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + + Map subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet()).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet()).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet()).encode())); + + Map assignments = partitionAssignor.assign(metadata, subscriptions); + + // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match + // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and + // then later ones will be re-assigned to other hosts due to load balancing + Set allActiveTasks = new HashSet<>(); + Set allPartitions = new HashSet<>(); + AssignmentInfo info; + + info = AssignmentInfo.decode(assignments.get("consumer10").userData()); + allActiveTasks.addAll(info.activeTasks); + allPartitions.addAll(assignments.get("consumer10").partitions()); + + info = AssignmentInfo.decode(assignments.get("consumer11").userData()); + allActiveTasks.addAll(info.activeTasks); + allPartitions.addAll(assignments.get("consumer11").partitions()); + + info = AssignmentInfo.decode(assignments.get("consumer20").userData()); + allActiveTasks.addAll(info.activeTasks); + allPartitions.addAll(assignments.get("consumer20").partitions()); + + assertEquals(allTasks, allActiveTasks); + assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions); + } + + @Test + public void testAssignWithStates() throws Exception { + StreamingConfig config = new StreamingConfig(configProps()); + + MockProducer producer = new MockProducer<>(true, serializer, serializer); + MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); + builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1"); + + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2"); + builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2"); + builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2"); + + List topics = Utils.mkList("topic1", "topic2"); + + TaskId task00 = new TaskId(0, 0); + TaskId task01 = new TaskId(0, 1); + TaskId task02 = new TaskId(0, 2); + TaskId task10 = new TaskId(1, 0); + TaskId task11 = new TaskId(1, 1); + TaskId task12 = new TaskId(1, 2); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + + Map subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet()).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet()).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet()).encode())); + + Map assignments = partitionAssignor.assign(metadata, subscriptions); + + // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match + assertEquals(2, assignments.get("consumer10").partitions().size()); + assertEquals(2, assignments.get("consumer11").partitions().size()); + assertEquals(2, assignments.get("consumer20").partitions().size()); + + assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size()); + assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size()); + assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size()); + + // check tasks for state topics + assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1")); + assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2")); + assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3")); } @Test @@ -257,11 +377,13 @@ public class KafkaStreamingPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; - StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime()); + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10)); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", @@ -298,10 +420,10 @@ public class KafkaStreamingPartitionAssignorTest { // all task ids are in the active tasks and also in the standby tasks assertEquals(3, allActiveTasks.size()); - assertEquals(allTasks, new HashSet<>(allActiveTasks)); + assertEquals(allTasks, allActiveTasks); assertEquals(3, allStandbyTasks.size()); - assertEquals(allTasks, new HashSet<>(allStandbyTasks)); + assertEquals(allTasks, allStandbyTasks); } private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) { @@ -354,17 +476,20 @@ public class KafkaStreamingPartitionAssignorTest { MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); MockConsumer mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + TopicPartition t2p3 = new TopicPartition("topic2", 3); + TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); UUID uuid = UUID.randomUUID(); + String client1 = "client1"; - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()); + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime()); KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread)); + partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); List activeTaskList = Utils.mkList(task0, task3); Map> standbyTasks = new HashMap<>(); @@ -375,8 +500,8 @@ public class KafkaStreamingPartitionAssignorTest { PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode()); partitionAssignor.onAssignment(assignment); - assertEquals(Utils.mkSet(task0), partitionAssignor.taskIds(t1p0)); - assertEquals(Utils.mkSet(task3), partitionAssignor.taskIds(t2p3)); + assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0)); + assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3)); assertEquals(standbyTasks, partitionAssignor.standbyTasks()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 17bc9da3b20..7e5ce491ab2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -178,14 +179,21 @@ public class ProcessorStateManagerTest { } } + private final String jobId = "test-job"; + private final String stateDir = "test"; + private final String persistentStoreName = "persistentStore"; + private final String nonPersistentStoreName = "nonPersistentStore"; + private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName); + private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName); + @Test public void testLockStateDirectory() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); try { FileLock lock; // the state manager locks the directory - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false); + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false); try { // this should not get the lock @@ -208,15 +216,15 @@ public class ProcessorStateManagerTest { } } - @Test(expected = IllegalStateException.class) + @Test(expected = KafkaException.class) public void testNoTopic() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); try { - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false); + MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false); + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false); try { - stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback); + stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { stateMgr.close(Collections.emptyMap()); } @@ -227,41 +235,42 @@ public class ProcessorStateManagerTest { @Test public void testRegisterPersistentStore() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); try { long lastCheckpointedOffset = 10L; + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset)); + checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("persistentStore", Utils.mkList( - new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0]) + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false); + + restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( + new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0]) )); - TopicPartition partition = new TopicPartition("persistentStore", 2); + TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); - MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - - ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false); + MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); // persistent store try { restoreConsumer.reset(); ArrayList expectedKeys = new ArrayList<>(); - long offset = -1L; + long offset; for (int i = 1; i <= 3; i++) { offset = (long) i; int key = i * 10; expectedKeys.add(key); restoreConsumer.bufferRecord( - new ConsumerRecord<>("persistentStore", 2, offset, key, 0) + new ConsumerRecord<>(persistentStoreTopicName, 2, offset, key, 0) ); } - stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); - assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition); + assertEquals(new TopicPartition(persistentStoreTopicName, 2), restoreConsumer.assignedPartition); assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset); assertFalse(restoreConsumer.seekToBeginingCalled); assertTrue(restoreConsumer.seekToEndCalled); @@ -278,24 +287,26 @@ public class ProcessorStateManagerTest { @Test public void testRegisterNonPersistentStore() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); try { long lastCheckpointedOffset = 10L; - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset)); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList( - new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0]) + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false); + + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); + checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); + + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0]) )); - TopicPartition partition = new TopicPartition("persistentStore", 2); + TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store + MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false); try { restoreConsumer.reset(); @@ -306,13 +317,13 @@ public class ProcessorStateManagerTest { int key = i; expectedKeys.add(i); restoreConsumer.bufferRecord( - new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0) + new ConsumerRecord<>(nonPersistentStoreTopicName, 2, offset, key, 0) ); } - stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); - assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition); + assertEquals(new TopicPartition(nonPersistentStoreTopicName, 2), restoreConsumer.assignedPartition); assertEquals(0L, restoreConsumer.seekOffset); assertTrue(restoreConsumer.seekToBeginingCalled); assertTrue(restoreConsumer.seekToEndCalled); @@ -328,37 +339,44 @@ public class ProcessorStateManagerTest { @Test public void testChangeLogOffsets() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); try { long lastCheckpointedOffset = 10L; + String storeName1 = "store1"; + String storeName2 = "store2"; + + String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1); + String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2); + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(new TopicPartition("store1", 0), lastCheckpointedOffset)); + checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("store1", Utils.mkList( - new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]) + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, baseDir, restoreConsumer, true); // standby + + restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList( + new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0]) )); - restoreConsumer.updatePartitions("store2", Utils.mkList( - new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]) + restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList( + new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0]) )); - TopicPartition partition1 = new TopicPartition("store1", 0); - TopicPartition partition2 = new TopicPartition("store2", 0); + TopicPartition partition1 = new TopicPartition(storeTopicName1, 0); + TopicPartition partition2 = new TopicPartition(storeTopicName2, 0); Map endOffsets = new HashMap<>(); endOffsets.put(partition1, 13L); endOffsets.put(partition2, 17L); restoreConsumer.updateEndOffsets(endOffsets); - MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true); - MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true); + MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true); + MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true); - ProcessorStateManager stateMgr = new ProcessorStateManager(0, baseDir, restoreConsumer, true); // standby try { restoreConsumer.reset(); - stateMgr.register(store1, store1.stateRestoreCallback); - stateMgr.register(store2, store2.stateRestoreCallback); + stateMgr.register(store1, true, store1.stateRestoreCallback); + stateMgr.register(store2, true, store2.stateRestoreCallback); Map changeLogOffsets = stateMgr.checkpointedOffsets(); @@ -379,21 +397,22 @@ public class ProcessorStateManagerTest { @Test public void testGetStore() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); try { MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("mockStore", Utils.mkList( - new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0]) + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false); + + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) )); - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false); + MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false); try { - stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback); + stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); assertNull(stateMgr.getStore("noSuchStore")); - assertEquals(mockStateStore, stateMgr.getStore("mockStore")); + assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName)); } finally { stateMgr.close(Collections.emptyMap()); @@ -405,7 +424,7 @@ public class ProcessorStateManagerTest { @Test public void testClose() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); + File baseDir = Files.createTempDirectory(stateDir).toFile(); File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME); try { // write an empty checkpoint file @@ -413,32 +432,33 @@ public class ProcessorStateManagerTest { oldCheckpoint.write(Collections.emptyMap()); MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions("persistentStore", Utils.mkList( - new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]) + ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false); + + restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( + new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) )); - restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList( - new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]) + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) )); // set up ack'ed offsets HashMap ackedOffsets = new HashMap<>(); - ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L); - ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L); - ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L); + ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L); + ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); + ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L); - MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); + MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); + MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); restoreConsumer.reset(); - stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); restoreConsumer.reset(); - stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); } finally { // close the state manager with the ack'ed offsets stateMgr.close(ackedOffsets); @@ -455,7 +475,7 @@ public class ProcessorStateManagerTest { OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile); Map checkpointedOffsets = newCheckpoint.read(); assertEquals(1, checkpointedOffsets.size()); - assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1))); + assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); } finally { Utils.delete(baseDir); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 9a43e4663b4..00b983d71a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -53,16 +53,22 @@ public class StandbyTaskTest { private final Serializer intSerializer = new IntegerSerializer(); - private final TopicPartition partition1 = new TopicPartition("store1", 1); - private final TopicPartition partition2 = new TopicPartition("store2", 1); + private final String jobId = "test-job"; + private final String storeName1 = "store1"; + private final String storeName2 = "store2"; + private final String storeChangelogTopicName1 = jobId + "-" + storeName1 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; + private final String storeChangelogTopicName2 = jobId + "-" + storeName2 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; + + private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1); + private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1); private final Set topicPartitions = Collections.emptySet(); private final ProcessorTopology topology = new ProcessorTopology( Collections.emptyList(), Collections.emptyMap(), Utils.mkList( - new MockStateStoreSupplier(partition1.topic(), false), - new MockStateStoreSupplier(partition2.topic(), true) + new MockStateStoreSupplier(storeName1, false), + new MockStateStoreSupplier(storeName2, true) ) ); @@ -91,25 +97,31 @@ public class StandbyTaskTest { @Before public void setup() { restoreStateConsumer.reset(); - restoreStateConsumer.updatePartitions("store1", Utils.mkList( - new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("store1", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("store1", 2, Node.noNode(), new Node[0], new Node[0]) + restoreStateConsumer.updatePartitions(storeChangelogTopicName1, Utils.mkList( + new PartitionInfo(storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0]) )); - restoreStateConsumer.updatePartitions("store2", Utils.mkList( - new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("store2", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("store2", 2, Node.noNode(), new Node[0], new Node[0]) + System.out.println("added " + storeChangelogTopicName1); + + restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList( + new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0]) )); + + System.out.println("added " + storeChangelogTopicName2); } @Test public void testStorePartitions() throws Exception { + System.out.println("STARTED"); + File baseDir = Files.createTempDirectory("test").toFile(); try { StreamingConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions())); @@ -124,7 +136,7 @@ public class StandbyTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamingConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -143,7 +155,7 @@ public class StandbyTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamingConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -168,9 +180,9 @@ public class StandbyTaskTest { StandbyContextImpl context = (StandbyContextImpl) task.context(); MockStateStoreSupplier.MockStateStore store1 = - (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition1.topic()); + (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName1); MockStateStoreSupplier.MockStateStore store2 = - (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition2.topic()); + (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName2); assertEquals(Collections.emptyList(), store1.keys); assertEquals(Utils.mkList(1, 2, 3), store2.keys); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index aae5a7d261a..1847e856641 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -103,7 +103,7 @@ public class StreamTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamingConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(0, 0), partitions, topology, consumer, producer, restoreStateConsumer, config, null); + StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), @@ -154,7 +154,7 @@ public class StreamTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamingConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(1, 1), partitions, topology, consumer, producer, restoreStateConsumer, config, null); + StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 9f3145011ea..5f0347dc15e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -59,7 +59,8 @@ import java.util.UUID; public class StreamThreadTest { - private UUID uuid = UUID.randomUUID(); + private String clientId = "clientId"; + private UUID processId = UUID.randomUUID(); private TopicPartition t1p1 = new TopicPartition("topic1", 1); private TopicPartition t1p2 = new TopicPartition("topic1", 2); @@ -90,7 +91,7 @@ public class StreamThreadTest { ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + 4); // version buf.putInt(1); - // encode client clientUUID + // encode client processId buf.putLong(uuid.getMostSignificantBits()); buf.putLong(uuid.getLeastSignificantBits()); // previously running tasks @@ -132,7 +133,7 @@ public class StreamThreadTest { Producer producer, Consumer restoreConsumer, StreamingConfig config) { - super(id, partitions, topology, consumer, producer, restoreConsumer, config, null); + super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null); } @Override @@ -159,7 +160,7 @@ public class StreamThreadTest { builder.addSource("source3", "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); @@ -279,7 +280,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) { @Override public void maybeClean() { super.maybeClean(); @@ -401,7 +402,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) { @Override public void maybeCommit() { super.maybeCommit(); @@ -471,7 +472,7 @@ public class StreamThreadTest { KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread)); + partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId)); Map assignments = partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index acc9a9d16aa..3119bee0f41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -31,13 +31,14 @@ public class SubscriptionInfoTest { @Test public void testEncodeDecode() { - UUID clientUUID = UUID.randomUUID(); + UUID processId = UUID.randomUUID(); + Set activeTasks = new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0))); Set standbyTasks = new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0))); - SubscriptionInfo info = new SubscriptionInfo(clientUUID, activeTasks, standbyTasks); + SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks); SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode()); assertEquals(info, decoded); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 28cc3afce29..9e247412ae6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -269,7 +269,7 @@ public class KeyValueStoreTestDriver { } @Override - public void register(StateStore store, StateRestoreCallback func) { + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) { storeMap.put(store.name(), store); restoreEntries(func); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index 81a9addabea..5d42a63ee79 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -127,7 +127,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override - public void register(StateStore store, StateRestoreCallback func) { + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) { storeMap.put(store.name(), store); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java index 16635b7efb1..3b17afef8e9 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java @@ -66,7 +66,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier { @Override public void init(ProcessorContext context) { - context.register(this, stateRestoreCallback); + context.register(this, true, stateRestoreCallback); initialized = true; } diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index fdb4d574a7e..879c172f2d7 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.state.KeyValueStore; @@ -124,6 +125,8 @@ public class ProcessorTopologyTestDriver { private final Serializer bytesSerializer = new ByteArraySerializer(); + private final String jobId = "test-driver-job"; + private final TaskId id; private final ProcessorTopology topology; private final StreamTask task; @@ -158,6 +161,7 @@ public class ProcessorTopologyTestDriver { } task = new StreamTask(id, + jobId, partitionsByTopic.values(), topology, consumer, @@ -324,12 +328,12 @@ public class ProcessorTopologyTestDriver { }; // For each store name ... for (String storeName : storeNames) { - String topicName = storeName; + String topicName = jobId + "-" + storeName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; // Set up the restore-state topic ... // consumer.subscribe(new TopicPartition(topicName, 1)); // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... List partitionInfos = new ArrayList<>(); - partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null)); + partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null)); consumer.updatePartitions(topicName, partitionInfos); consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L)); } diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java index 04c8f61daca..3da1ca7ed86 100644 --- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java +++ b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java @@ -49,7 +49,7 @@ public class UnlimitedWindowDef implements WindowSupplier { @Override public void init(ProcessorContext context) { - context.register(this, null); + context.register(this, true, null); } @Override