diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 7e3cab9054e..6e5aec5413b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -85,7 +85,7 @@ public class TopologyBuilder { this.name = name; } - public abstract ProcessorNode build(); + public abstract ProcessorNode build(String jobId); } private static class ProcessorNodeFactory extends NodeFactory { @@ -105,7 +105,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build() { + public ProcessorNode build(String jobId) { return new ProcessorNode(name, supplier.get(), stateStoreNames); } } @@ -124,12 +124,12 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build() { + public ProcessorNode build(String jobId) { return new SourceNode(name, keyDeserializer, valDeserializer); } } - private static class SinkNodeFactory extends NodeFactory { + private class SinkNodeFactory extends NodeFactory { public final String[] parents; public final String topic; private Serializer keySerializer; @@ -147,8 +147,13 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build() { - return new SinkNode(name, topic, keySerializer, valSerializer, partitioner); + public ProcessorNode build(String jobId) { + if (internalTopicNames.contains(topic)) { + // prefix the job id to the internal topic name + return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner); + } else { + return new SinkNode(name, topic, keySerializer, valSerializer, partitioner); + } } } @@ -491,7 +496,7 @@ public class TopologyBuilder { * * @return groups of topic names */ - public Map topicGroups() { + public Map topicGroups(String jobId) { Map topicGroups = new HashMap<>(); if (nodeGroups == null) @@ -506,27 +511,35 @@ public class TopologyBuilder { // if the node is a source node, add to the source topics String[] topics = nodeToSourceTopics.get(node); if (topics != null) { - sourceTopics.addAll(Arrays.asList(topics)); - // if some of the topics are internal, add them to the internal topics for (String topic : topics) { - if (this.internalTopicNames.contains(topic)) - internalSourceTopics.add(topic); + if (this.internalTopicNames.contains(topic)) { + // prefix the job id to the internal topic name + String internalTopic = jobId + "-" + topic; + internalSourceTopics.add(internalTopic); + sourceTopics.add(internalTopic); + } else { + sourceTopics.add(topic); + } } } // if the node is a sink node, add to the sink topics String topic = nodeToSinkTopic.get(node); - if (topic != null) - sinkTopics.add(topic); + if (topic != null) { + if (internalTopicNames.contains(topic)) { + // prefix the job id to the change log topic name + sinkTopics.add(jobId + "-" + topic); + } else { + sinkTopics.add(topic); + } + } // if the node is connected to a state, add to the state topics for (StateStoreFactory stateFactory : stateFactories.values()) { - - // we store the changelog topic here without the job id prefix - // since it is within a single job and is only used for if (stateFactory.isInternal && stateFactory.users.contains(node)) { - stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); + // prefix the job id to the change log topic name + stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); } } } @@ -586,7 +599,7 @@ public class TopologyBuilder { return nodeGroups; } - + /** * Asserts that the streams of the specified source nodes must be copartitioned. * @@ -624,7 +637,7 @@ public class TopologyBuilder { * * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig) */ - public ProcessorTopology build(Integer topicGroupId) { + public ProcessorTopology build(String jobId, Integer topicGroupId) { Set nodeGroup; if (topicGroupId != null) { nodeGroup = nodeGroups().get(topicGroupId); @@ -632,11 +645,11 @@ public class TopologyBuilder { // when nodeGroup is null, we build the full topology. this is used in some tests. nodeGroup = null; } - return build(nodeGroup); + return build(jobId, nodeGroup); } @SuppressWarnings("unchecked") - private ProcessorTopology build(Set nodeGroup) { + private ProcessorTopology build(String jobId, Set nodeGroup) { List processorNodes = new ArrayList<>(nodeFactories.size()); Map processorMap = new HashMap<>(); Map topicSourceMap = new HashMap<>(); @@ -645,7 +658,7 @@ public class TopologyBuilder { // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - ProcessorNode node = factory.build(); + ProcessorNode node = factory.build(jobId); processorNodes.add(node); processorMap.put(node.name(), node); @@ -660,7 +673,12 @@ public class TopologyBuilder { } } else if (factory instanceof SourceNodeFactory) { for (String topic : ((SourceNodeFactory) factory).topics) { - topicSourceMap.put(topic, (SourceNode) node); + if (internalTopicNames.contains(topic)) { + // prefix the job id to the internal topic name + topicSourceMap.put(jobId + "-" + topic, (SourceNode) node); + } else { + topicSourceMap.put(topic, (SourceNode) node); + } } } else if (factory instanceof SinkNodeFactory) { for (String parent : ((SinkNodeFactory) factory).parents) { @@ -679,7 +697,15 @@ public class TopologyBuilder { * Get the names of topics that are to be consumed by the source nodes created by this builder. * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null */ - public Set sourceTopics() { - return Collections.unmodifiableSet(sourceTopicNames); + public Set sourceTopics(String jobId) { + Set topics = new HashSet<>(); + for (String topic : sourceTopicNames) { + if (internalTopicNames.contains(topic)) { + topics.add(jobId + "-" + topic); + } else { + topics.add(topic); + } + } + return Collections.unmodifiableSet(topics); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 13f269b5fc9..266df3ed099 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable streamThread = (StreamThread) o; streamThread.partitionAssignor(this); - this.topicGroups = streamThread.builder.topicGroups(); + this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId); if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { internalTopicManager = new InternalTopicManager( @@ -350,7 +350,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable topicToTaskIds.putAll(internalSourceTopicToTaskIds); for (Map.Entry> entry : topicToTaskIds.entrySet()) { - String topic = streamThread.jobId + "-" + entry.getKey(); + String topic = entry.getKey(); // the expected number of partitions is the max value of TaskId.partition + 1 int numPartitions = 0; @@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable /* For Test Only */ public Set tasksForState(String stateName) { - return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); + return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName)); } public Set tasksForPartition(TopicPartition partition) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 4ce86ac91ca..e9343e0afeb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -173,7 +173,7 @@ public class StreamThread extends Thread { this.jobId = jobId; this.config = config; this.builder = builder; - this.sourceTopics = builder.sourceTopics(); + this.sourceTopics = builder.sourceTopics(jobId); this.clientId = clientId; this.processId = processId; this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); @@ -580,7 +580,7 @@ public class StreamThread extends Thread { protected StreamTask createStreamTask(TaskId id, Collection partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(id.topicGroupId); + ProcessorTopology topology = builder.build(jobId, id.topicGroupId); return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors); } @@ -650,7 +650,7 @@ public class StreamThread extends Thread { protected StandbyTask createStandbyTask(TaskId id, Collection partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(id.topicGroupId); + ProcessorTopology topology = builder.build(jobId, id.topicGroupId); if (!topology.stateStoreSuppliers().isEmpty()) { return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 1ce56ff7882..3d3a9e3d6c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -137,6 +137,6 @@ public class KStreamImplTest { 1 + // to 2 + // through 1, // process - builder.build(null).processors().size()); + builder.build("X", null).processors().size()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 0635bd20283..9af313a958f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -142,8 +142,14 @@ public class TopologyBuilderTest { builder.addSource("source-1", "topic-1"); builder.addSource("source-2", "topic-2"); builder.addSource("source-3", "topic-3"); + builder.addInternalTopic("topic-3"); - assertEquals(3, builder.sourceTopics().size()); + Set expected = new HashSet(); + expected.add("topic-1"); + expected.add("topic-2"); + expected.add("X-topic-3"); + + assertEquals(expected, builder.sourceTopics("X")); } @Test(expected = TopologyBuilderException.class) @@ -184,13 +190,13 @@ public class TopologyBuilderTest { StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false); builder.addStateStore(supplier); - suppliers = builder.build(null).stateStoreSuppliers(); + suppliers = builder.build("X", null).stateStoreSuppliers(); assertEquals(0, suppliers.size()); builder.addSource("source-1", "topic-1"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); builder.connectProcessorAndStateStores("processor-1", "store-1"); - suppliers = builder.build(null).stateStoreSuppliers(); + suppliers = builder.build("X", null).stateStoreSuppliers(); assertEquals(1, suppliers.size()); assertEquals(supplier.name(), suppliers.get(0).name()); } @@ -212,7 +218,7 @@ public class TopologyBuilderTest { builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - Map topicGroups = builder.topicGroups(); + Map topicGroups = builder.topicGroups("X"); Map expectedTopicGroups = new HashMap<>(); expectedTopicGroups.put(0, new TopicsInfo(Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptySet(), Collections.emptySet())); @@ -250,12 +256,12 @@ public class TopologyBuilderTest { builder.addStateStore(supplier); builder.connectProcessorAndStateStores("processor-5", "store-3"); - Map topicGroups = builder.topicGroups(); + Map topicGroups = builder.topicGroups("X"); Map expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); - expectedTopicGroups.put(1, new TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); - expectedTopicGroups.put(2, new TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); + expectedTopicGroups.put(0, new TopicsInfo(Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1")))); + expectedTopicGroups.put(1, new TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-2")))); + expectedTopicGroups.put(2, new TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-3")))); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -275,9 +281,9 @@ public class TopologyBuilderTest { builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1"); builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - ProcessorTopology topology0 = builder.build(0); - ProcessorTopology topology1 = builder.build(1); - ProcessorTopology topology2 = builder.build(2); + ProcessorTopology topology0 = builder.build("X", 0); + ProcessorTopology topology1 = builder.build("X", 1); + ProcessorTopology topology2 = builder.build("X", 2); assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors())); assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 40cce93a79a..c8115b86896 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -97,7 +97,7 @@ public class ProcessorTopologyTest { builder.addSink("sink-1", "topic-3", "processor-1"); builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); - final ProcessorTopology topology = builder.build(null); + final ProcessorTopology topology = builder.build("X", null); assertEquals(6, topology.processors().size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 9ff0af07af4..7f37bdafaab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -521,7 +521,7 @@ public class StreamPartitionAssignorTest { builder.addSink("sink1", "topicX", "processor1"); builder.addSource("source2", "topicX"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); - List topics = Utils.mkList("topic1", "topicX"); + List topics = Utils.mkList("topic1", "test-topicX"); Set allTasks = Utils.mkSet(task0, task1, task2); UUID uuid1 = UUID.randomUUID(); @@ -543,9 +543,7 @@ public class StreamPartitionAssignorTest { Map assignments = partitionAssignor.assign(metadata, subscriptions); // check prepared internal topics - // TODO: we need to change it to 1 after fixing the prefix - assertEquals(2, internalTopicManager.readyTopics.size()); - assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX")); + assertEquals(1, internalTopicManager.readyTopics.size()); assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e0727476b80..eaaf842b4f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -171,7 +171,7 @@ public class StreamThreadTest { StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { - ProcessorTopology topology = builder.build(id.topicGroupId); + ProcessorTopology topology = builder.build("X", id.topicGroupId); return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -298,7 +298,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { - ProcessorTopology topology = builder.build(id.topicGroupId); + ProcessorTopology topology = builder.build("X", id.topicGroupId); return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -420,7 +420,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection partitionsForTask) { - ProcessorTopology topology = builder.build(id.topicGroupId); + ProcessorTopology topology = builder.build("X", id.topicGroupId); return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index c0c5c392096..edbcb4a9faa 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -53,7 +53,7 @@ public class KStreamTestDriver { File stateDir, Serializer keySerializer, Deserializer keyDeserializer, Serializer valSerializer, Deserializer valDeserializer) { - this.topology = builder.build(null); + this.topology = builder.build("X", null); this.stateDir = stateDir; this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector()); @@ -127,7 +127,7 @@ public class KStreamTestDriver { public MockRecordCollector() { super(null); } - + @Override public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, StreamPartitioner partitioner) { diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 34fd10c5799..cf17dbe99a7 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -146,7 +146,7 @@ public class ProcessorTopologyTestDriver { */ public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { id = new TaskId(0, 0); - topology = builder.build(null); + topology = builder.build("X", null); // Set up the consumer and producer ... consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py index 552390999d4..d6746419462 100644 --- a/tests/kafkatest/tests/streams_bounce_test.py +++ b/tests/kafkatest/tests/streams_bounce_test.py @@ -41,7 +41,6 @@ class StreamsBounceTest(KafkaTest): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - @ignore def test_bounce(self): """ Start a smoke test client, then abort (kill -9) and restart it a few times. diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py index ea05c5f4cd0..e3c465af32f 100644 --- a/tests/kafkatest/tests/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams_smoke_test.py @@ -44,7 +44,6 @@ class StreamsSmokeTest(KafkaTest): self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - @ignore def test_streams(self): """ Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.