diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index c3474b09344..88a420a75b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -629,7 +629,7 @@ public class TopologyBuilderTest { goodNodeName) .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); - final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME); + final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder); driver.process("topic", null, null); } catch (final StreamsException e) { final Throwable cause = e.getCause(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 322c17810c2..65b3e2f67ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -189,7 +189,7 @@ public class ProcessorTopologyTest { @Test public void testDrivingStatefulTopology() { String storeName = "entries"; - driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName); + driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName)); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); @@ -214,7 +214,7 @@ public class ProcessorTopologyTest { final TopologyBuilder topologyBuilder = this.builder .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store"))); - driver = new ProcessorTopologyTestDriver(config, topologyBuilder, "my-store"); + driver = new ProcessorTopologyTestDriver(config, topologyBuilder); driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); assertEquals("value1", globalStore.get("key1")); @@ -235,6 +235,17 @@ public class ProcessorTopologyTest { assertNoOutputRecord(OUTPUT_TOPIC_1); } + @Test + public void testDrivingForwardToSourceTopology() { + driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology()); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3"); + } + @Test public void testDrivingInternalRepartitioningTopology() { driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology()); @@ -380,6 +391,13 @@ public class ProcessorTopologyTest { .addSink("sink1", OUTPUT_TOPIC_1, "source1"); } + private TopologyBuilder createForwardToSourceTopology() { + return builder.addSource("source-1", INPUT_TOPIC_1) + .addSink("sink-1", OUTPUT_TOPIC_1, "source-1") + .addSource("source-2", OUTPUT_TOPIC_1) + .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"); + } + private TopologyBuilder createSimpleMultiSourceTopology(int partition) { return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index b704aa70e36..1e97e11b41b 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -52,7 +52,6 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; @@ -143,9 +142,10 @@ public class ProcessorTopologyTestDriver { private final Serializer bytesSerializer = new ByteArraySerializer(); - private final String applicationId = "test-driver-application"; + private final static String APPLICATION_ID = "test-driver-application"; + private final static int PARTITION_ID = 0; + private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID); - private final TaskId id; private final ProcessorTopology topology; private final MockConsumer consumer; private final MockProducer producer; @@ -163,11 +163,9 @@ public class ProcessorTopologyTestDriver { * Create a new test driver instance. * @param config the stream configuration for the topology * @param builder the topology builder that will be used to create the topology instance - * @param storeNames the optional names of the state stores that are used by the topology */ - public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { - id = new TaskId(0, 0); - topology = builder.setApplicationId(applicationId).build(null); + public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder) { + topology = builder.setApplicationId(APPLICATION_ID).build(null); globalTopology = builder.buildGlobalStateTopology(); // Set up the consumer and producer ... @@ -175,10 +173,10 @@ public class ProcessorTopologyTestDriver { producer = new MockProducer(true, bytesSerializer, bytesSerializer) { @Override public List partitionsFor(String topic) { - return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)); + return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null)); } }; - restoreStateConsumer = createRestoreConsumer(id, storeNames); + restoreStateConsumer = createRestoreConsumer(TASK_ID, topology.storeToChangelogTopic()); // Identify internal topics for forwarding in process ... for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { @@ -187,14 +185,14 @@ public class ProcessorTopologyTestDriver { // Set up all of the topic+partition information and subscribe the consumer to each ... for (String topic : topology.sourceTopics()) { - TopicPartition tp = new TopicPartition(topic, 1); + TopicPartition tp = new TopicPartition(topic, PARTITION_ID); partitionsByTopic.put(topic, tp); offsetsByTopicPartition.put(tp, new AtomicLong()); } consumer.assign(offsetsByTopicPartition.keySet()); - final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath(), Time.SYSTEM); + final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics); @@ -218,8 +216,8 @@ public class ProcessorTopologyTestDriver { } if (!partitionsByTopic.isEmpty()) { - task = new StreamTask(id, - applicationId, + task = new StreamTask(TASK_ID, + APPLICATION_ID, partitionsByTopic.values(), topology, consumer, @@ -263,8 +261,8 @@ public class ProcessorTopologyTestDriver { } outputRecords.add(record); - // Forward back into the topology if the produced record is to an internal topic ... - if (internalTopics.contains(record.topic())) { + // Forward back into the topology if the produced record is to an internal or a source topic ... + if (internalTopics.contains(record.topic()) || topology.sourceTopics().contains(record.topic())) { process(record.topic(), record.key(), record.value(), record.timestamp()); } } @@ -339,7 +337,7 @@ public class ProcessorTopologyTestDriver { /** * Get the {@link StateStore} with the given name. The name should have been supplied via - * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is + * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is * presumed to be used by a Processor within the topology. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to @@ -355,7 +353,7 @@ public class ProcessorTopologyTestDriver { /** * Get the {@link KeyValueStore} with the given name. The name should have been supplied via - * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is + * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is * presumed to be used by a Processor within the topology. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to @@ -393,10 +391,10 @@ public class ProcessorTopologyTestDriver { * driver object unless this method is overwritten with a functional consumer. * * @param id the ID of the stream task - * @param storeNames the names of the stores that this + * @param storeToChangelogTopic the map of the names of the stores to the changelog topics * @return the mock consumer; never null */ - protected MockConsumer createRestoreConsumer(TaskId id, String... storeNames) { + protected MockConsumer createRestoreConsumer(TaskId id, Map storeToChangelogTopic) { MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST) { @Override public synchronized void seekToEnd(Collection partitions) { @@ -414,16 +412,16 @@ public class ProcessorTopologyTestDriver { return 0L; } }; - // For each store name ... - for (String storeName : storeNames) { - String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); + // For each store ... + for (Map.Entry storeAndTopic: storeToChangelogTopic.entrySet()) { + String topicName = storeAndTopic.getValue(); // Set up the restore-state topic ... // consumer.subscribe(new TopicPartition(topicName, 1)); // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... List partitionInfos = new ArrayList<>(); - partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null)); + partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null)); consumer.updatePartitions(topicName, partitionInfos); - consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L)); + consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L)); } return consumer; }