KAFKA-4828: ProcessorTopologyTestDriver does not work when using through

This resolves the following issues in the ProcessorTopologyTestDriver:

- It should not create an internal changelog topic when using `through()` and `table()`
- It should forward the produced record back into the topology if it is to a source topic

Jira ticket: https://issues.apache.org/jira/browse/KAFKA-4828

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Hamidreza Afzali <hrafzali@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2629 from hrafzali/KAFKA-4828_ProcessorTopologyTestDriver_through
This commit is contained in:
Hamidreza Afzali 2017-03-05 22:42:16 -08:00 committed by Guozhang Wang
parent b1272500bd
commit 5b013d9cd2
3 changed files with 43 additions and 27 deletions

View File

@ -629,7 +629,7 @@ public class TopologyBuilderTest {
goodNodeName) goodNodeName)
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME); final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
driver.process("topic", null, null); driver.process("topic", null, null);
} catch (final StreamsException e) { } catch (final StreamsException e) {
final Throwable cause = e.getCause(); final Throwable cause = e.getCause();

View File

@ -189,7 +189,7 @@ public class ProcessorTopologyTest {
@Test @Test
public void testDrivingStatefulTopology() { public void testDrivingStatefulTopology() {
String storeName = "entries"; String storeName = "entries";
driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName); driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName));
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
@ -214,7 +214,7 @@ public class ProcessorTopologyTest {
final TopologyBuilder topologyBuilder = this.builder final TopologyBuilder topologyBuilder = this.builder
.addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store"))); .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
driver = new ProcessorTopologyTestDriver(config, topologyBuilder, "my-store"); driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
assertEquals("value1", globalStore.get("key1")); assertEquals("value1", globalStore.get("key1"));
@ -235,6 +235,17 @@ public class ProcessorTopologyTest {
assertNoOutputRecord(OUTPUT_TOPIC_1); assertNoOutputRecord(OUTPUT_TOPIC_1);
} }
@Test
public void testDrivingForwardToSourceTopology() {
driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology());
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3");
}
@Test @Test
public void testDrivingInternalRepartitioningTopology() { public void testDrivingInternalRepartitioningTopology() {
driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology()); driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology());
@ -380,6 +391,13 @@ public class ProcessorTopologyTest {
.addSink("sink1", OUTPUT_TOPIC_1, "source1"); .addSink("sink1", OUTPUT_TOPIC_1, "source1");
} }
private TopologyBuilder createForwardToSourceTopology() {
return builder.addSource("source-1", INPUT_TOPIC_1)
.addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
.addSource("source-2", OUTPUT_TOPIC_1)
.addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
}
private TopologyBuilder createSimpleMultiSourceTopology(int partition) { private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1")

View File

@ -52,7 +52,6 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StateDirectory;
@ -143,9 +142,10 @@ public class ProcessorTopologyTestDriver {
private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
private final String applicationId = "test-driver-application"; private final static String APPLICATION_ID = "test-driver-application";
private final static int PARTITION_ID = 0;
private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
private final TaskId id;
private final ProcessorTopology topology; private final ProcessorTopology topology;
private final MockConsumer<byte[], byte[]> consumer; private final MockConsumer<byte[], byte[]> consumer;
private final MockProducer<byte[], byte[]> producer; private final MockProducer<byte[], byte[]> producer;
@ -163,11 +163,9 @@ public class ProcessorTopologyTestDriver {
* Create a new test driver instance. * Create a new test driver instance.
* @param config the stream configuration for the topology * @param config the stream configuration for the topology
* @param builder the topology builder that will be used to create the topology instance * @param builder the topology builder that will be used to create the topology instance
* @param storeNames the optional names of the state stores that are used by the topology
*/ */
public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder) {
id = new TaskId(0, 0); topology = builder.setApplicationId(APPLICATION_ID).build(null);
topology = builder.setApplicationId(applicationId).build(null);
globalTopology = builder.buildGlobalStateTopology(); globalTopology = builder.buildGlobalStateTopology();
// Set up the consumer and producer ... // Set up the consumer and producer ...
@ -175,10 +173,10 @@ public class ProcessorTopologyTestDriver {
producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) { producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
@Override @Override
public List<PartitionInfo> partitionsFor(String topic) { public List<PartitionInfo> partitionsFor(String topic) {
return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)); return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
} }
}; };
restoreStateConsumer = createRestoreConsumer(id, storeNames); restoreStateConsumer = createRestoreConsumer(TASK_ID, topology.storeToChangelogTopic());
// Identify internal topics for forwarding in process ... // Identify internal topics for forwarding in process ...
for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
@ -187,14 +185,14 @@ public class ProcessorTopologyTestDriver {
// Set up all of the topic+partition information and subscribe the consumer to each ... // Set up all of the topic+partition information and subscribe the consumer to each ...
for (String topic : topology.sourceTopics()) { for (String topic : topology.sourceTopics()) {
TopicPartition tp = new TopicPartition(topic, 1); TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
partitionsByTopic.put(topic, tp); partitionsByTopic.put(topic, tp);
offsetsByTopicPartition.put(tp, new AtomicLong()); offsetsByTopicPartition.put(tp, new AtomicLong());
} }
consumer.assign(offsetsByTopicPartition.keySet()); consumer.assign(offsetsByTopicPartition.keySet());
final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath(), Time.SYSTEM); final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics); final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);
@ -218,8 +216,8 @@ public class ProcessorTopologyTestDriver {
} }
if (!partitionsByTopic.isEmpty()) { if (!partitionsByTopic.isEmpty()) {
task = new StreamTask(id, task = new StreamTask(TASK_ID,
applicationId, APPLICATION_ID,
partitionsByTopic.values(), partitionsByTopic.values(),
topology, topology,
consumer, consumer,
@ -263,8 +261,8 @@ public class ProcessorTopologyTestDriver {
} }
outputRecords.add(record); outputRecords.add(record);
// Forward back into the topology if the produced record is to an internal topic ... // Forward back into the topology if the produced record is to an internal or a source topic ...
if (internalTopics.contains(record.topic())) { if (internalTopics.contains(record.topic()) || topology.sourceTopics().contains(record.topic())) {
process(record.topic(), record.key(), record.value(), record.timestamp()); process(record.topic(), record.key(), record.value(), record.timestamp());
} }
} }
@ -339,7 +337,7 @@ public class ProcessorTopologyTestDriver {
/** /**
* Get the {@link StateStore} with the given name. The name should have been supplied via * Get the {@link StateStore} with the given name. The name should have been supplied via
* {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is
* presumed to be used by a Processor within the topology. * presumed to be used by a Processor within the topology.
* <p> * <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@ -355,7 +353,7 @@ public class ProcessorTopologyTestDriver {
/** /**
* Get the {@link KeyValueStore} with the given name. The name should have been supplied via * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
* {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is
* presumed to be used by a Processor within the topology. * presumed to be used by a Processor within the topology.
* <p> * <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@ -393,10 +391,10 @@ public class ProcessorTopologyTestDriver {
* driver object unless this method is overwritten with a functional consumer. * driver object unless this method is overwritten with a functional consumer.
* *
* @param id the ID of the stream task * @param id the ID of the stream task
* @param storeNames the names of the stores that this * @param storeToChangelogTopic the map of the names of the stores to the changelog topics
* @return the mock consumer; never null * @return the mock consumer; never null
*/ */
protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String... storeNames) { protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, Map<String, String> storeToChangelogTopic) {
MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
@Override @Override
public synchronized void seekToEnd(Collection<TopicPartition> partitions) { public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
@ -414,16 +412,16 @@ public class ProcessorTopologyTestDriver {
return 0L; return 0L;
} }
}; };
// For each store name ... // For each store ...
for (String storeName : storeNames) { for (Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) {
String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); String topicName = storeAndTopic.getValue();
// Set up the restore-state topic ... // Set up the restore-state topic ...
// consumer.subscribe(new TopicPartition(topicName, 1)); // consumer.subscribe(new TopicPartition(topicName, 1));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
List<PartitionInfo> partitionInfos = new ArrayList<>(); List<PartitionInfo> partitionInfos = new ArrayList<>();
partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null)); partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
consumer.updatePartitions(topicName, partitionInfos); consumer.updatePartitions(topicName, partitionInfos);
consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L)); consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
} }
return consumer; return consumer;
} }