KAFKA-10629: TopologyTestDriver should not require a Properties argument (#9660)

Implements KIP-680.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Rohit Deshpande 2020-12-04 10:51:12 -08:00 committed by GitHub
parent 4f2f08eb00
commit 4e9c7fc8a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 60 additions and 47 deletions

View File

@ -112,7 +112,7 @@ public class StreamsBuilderTest {
}
}
);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), new Properties())) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
inputTopic.pipeInput("hey", "there");

View File

@ -37,7 +37,6 @@ import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@ -48,9 +47,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -353,8 +349,7 @@ public class TopologyTest {
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
try {
new TopologyTestDriver(topology, mkProperties(
mkMap(mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()))));
new TopologyTestDriver(topology);
fail("Should have thrown StreamsException");
} catch (final StreamsException e) {
final String error = e.toString();

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
@ -35,8 +34,8 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.util.Random;
import static org.easymock.EasyMock.createMock;
@ -44,9 +43,6 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertTrue;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
public class AbstractStreamTest {
@ -86,8 +82,8 @@ public class AbstractStreamTest {
stream.randomFilter().process(supplier);
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), mkProperties(
mkMap(mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()))));
final TopologyTestDriver driver = new TopologyTestDriver(builder.build());
final TestInputTopic<Integer, String> inputTopic = driver.createInputTopic(topicName, new IntegerSerializer(), new StringSerializer());
for (final int expectedKey : expectedKeys) {
inputTopic.pipeInput(expectedKey, "V" + expectedKey);

View File

@ -79,7 +79,6 @@ public class KStreamTransformTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
new Properties(),
Instant.ofEpochMilli(0L))) {
final TestInputTopic<Integer, Integer> inputTopic =
driver.createInputTopic(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());

View File

@ -36,7 +36,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.hamcrest.CoreMatchers.containsString;
@ -171,7 +171,7 @@ public class ProcessorNodeTest {
});
final Topology topology = builder.build();
final TopologyTestDriver testDriver = new TopologyTestDriver(topology, new Properties());
final TopologyTestDriver testDriver = new TopologyTestDriver(topology);
final TestInputTopic<String, String> topic = testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(), new StringSerializer());
final StreamsException se = assertThrows(StreamsException.class, () -> topic.pipeInput("a-key", "a value"));

View File

@ -99,6 +99,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@ -242,6 +243,16 @@ public class TopologyTestDriver implements Closeable {
public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {}
};
/**
* Create a new test diver instance.
* Default test properties are used to initialize the driver instance
*
* @param topology the topology to be tested
*/
public TopologyTestDriver(final Topology topology) {
this(topology, new Properties());
}
/**
* Create a new test diver instance.
* Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}.
@ -254,6 +265,18 @@ public class TopologyTestDriver implements Closeable {
this(topology, config, null);
}
/**
* Create a new test diver instance.
*
* @param topology the topology to be tested
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
*/
public TopologyTestDriver(final Topology topology,
final Instant initialWallClockTimeMs) {
this(topology, new Properties(), initialWallClockTimeMs);
}
/**
* Create a new test diver instance.
*
@ -299,7 +322,8 @@ public class TopologyTestDriver implements Closeable {
final Properties configCopy = new Properties();
configCopy.putAll(config);
configCopy.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
configCopy.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-topology-test-driver-app-id");
// provide randomized dummy app-id if it's not specified
configCopy.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-topology-test-driver-app-id-" + ThreadLocalRandom.current().nextInt());
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
logIfTaskIdleEnabled(streamsConfig);

View File

@ -42,7 +42,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@ -75,7 +74,7 @@ public class TestTopicsTest {
final KStream<Long, String> source = builder.stream(INPUT_TOPIC_MAP, Consumed.with(longSerde, stringSerde));
final KStream<String, Long> mapped = source.map((key, value) -> new KeyValue<>(value, key));
mapped.to(OUTPUT_TOPIC_MAP, Produced.with(stringSerde, longSerde));
testDriver = new TopologyTestDriver(builder.build(), new Properties());
testDriver = new TopologyTestDriver(builder.build());
}
@After

View File

@ -471,7 +471,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldThrowForUnknownTopic() {
testDriver = new TopologyTestDriver(new Topology(), config);
testDriver = new TopologyTestDriver(new Topology());
assertThrows(
IllegalArgumentException.class,
() -> testDriver.pipeRecord(
@ -485,7 +485,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldThrowForMissingTime() {
testDriver = new TopologyTestDriver(new Topology(), config);
testDriver = new TopologyTestDriver(new Topology());
assertThrows(
IllegalStateException.class,
() -> testDriver.pipeRecord(
@ -506,7 +506,7 @@ public class TopologyTestDriverTest {
new ByteArraySerializer(),
new ByteArraySerializer());
testDriver = new TopologyTestDriver(new Topology(), config);
testDriver = new TopologyTestDriver(new Topology());
try {
testDriver.pipeInput(consumerRecordFactory.create((byte[]) null));
fail("Should have throw IllegalArgumentException");
@ -517,7 +517,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting() {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
testDriver = new TopologyTestDriver(setupSourceSinkTopology());
final TestOutputTopic<String, String> outputTopic = new TestOutputTopic<>(
testDriver,
"unused-topic",
@ -531,7 +531,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldCaptureSinkTopicNamesIfWrittenInto() {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
testDriver = new TopologyTestDriver(setupSourceSinkTopology());
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet()));
@ -577,7 +577,7 @@ public class TopologyTestDriverTest {
builder.globalTable(SOURCE_TOPIC_1, Materialized.as("globalTable"));
builder.stream(SOURCE_TOPIC_2).to(SOURCE_TOPIC_1);
testDriver = new TopologyTestDriver(builder.build(), config);
testDriver = new TopologyTestDriver(builder.build());
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet()));
@ -590,7 +590,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldProcessRecordForTopic() {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
testDriver = new TopologyTestDriver(setupSourceSinkTopology());
pipeRecord(SOURCE_TOPIC_1, testRecord1);
final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
@ -602,7 +602,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldSetRecordMetadata() {
testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
testDriver = new TopologyTestDriver(setupSingleProcessorTopology());
pipeRecord(SOURCE_TOPIC_1, testRecord1);
@ -624,7 +624,7 @@ public class TopologyTestDriverTest {
//Test not migrated to non-deprecated methods, topic handling not based on record any more
@Test
public void shouldSendRecordViaCorrectSourceTopicDeprecated() {
testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2));
final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
@ -677,7 +677,7 @@ public class TopologyTestDriverTest {
},
processor);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
final org.apache.kafka.streams.test.ConsumerRecordFactory<Long, String> source1Factory =
new org.apache.kafka.streams.test.ConsumerRecordFactory<>(
@ -739,7 +739,7 @@ public class TopologyTestDriverTest {
},
processor);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
final Long source1Key = 42L;
final String source1Value = "anyString";
@ -772,7 +772,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldPassRecordHeadersIntoSerializersAndDeserializers() {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
testDriver = new TopologyTestDriver(setupSourceSinkTopology());
final AtomicBoolean passedHeadersToKeySerializer = new AtomicBoolean(false);
final AtomicBoolean passedHeadersToValueSerializer = new AtomicBoolean(false);
@ -832,7 +832,7 @@ public class TopologyTestDriverTest {
topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), sourceName1);
topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), sourceName2);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
final Long source1Key = 42L;
final String source1Value = "anyString";
@ -867,7 +867,7 @@ public class TopologyTestDriverTest {
//Test not migrated to non-deprecated methods, List processing now in TestInputTopic
@Test
public void shouldProcessConsumerRecordList() {
testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2));
final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
@ -892,7 +892,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldForwardRecordsFromSubtopologyToSubtopology() {
testDriver = new TopologyTestDriver(setupTopologyWithTwoSubtopologies(), config);
testDriver = new TopologyTestDriver(setupTopologyWithTwoSubtopologies());
pipeRecord(SOURCE_TOPIC_1, testRecord1);
@ -909,7 +909,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldPopulateGlobalStore() {
testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config);
testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1));
final KeyValueStore<byte[], byte[]> globalStore = testDriver.getKeyValueStore(SOURCE_TOPIC_1 + "-globalStore");
Assert.assertNotNull(globalStore);
@ -924,8 +924,8 @@ public class TopologyTestDriverTest {
public void shouldPunctuateOnStreamsTime() {
final MockPunctuator mockPunctuator = new MockPunctuator();
testDriver = new TopologyTestDriver(
setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator),
config);
setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator)
);
final List<Long> expectedPunctuations = new LinkedList<>();
@ -1052,7 +1052,7 @@ public class TopologyTestDriverTest {
"globalProcessorName",
voidProcessorSupplier);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
final Set<String> expectedStoreNames = new HashSet<>();
expectedStoreNames.add("store");
@ -1096,7 +1096,7 @@ public class TopologyTestDriverTest {
globalTimestampedKeyValueStoreName);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
// verify state stores
assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
@ -1175,7 +1175,7 @@ public class TopologyTestDriverTest {
globalTimestampedKeyValueStoreName);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
{
final IllegalArgumentException e = assertThrows(
@ -1367,7 +1367,7 @@ public class TopologyTestDriverTest {
"globalProcessorName",
voidProcessorSupplier);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
final Set<String> expectedStoreNames = new HashSet<>();
expectedStoreNames.add("store");
@ -1515,7 +1515,7 @@ public class TopologyTestDriverTest {
Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve better test coverage
"aggregator");
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
store = testDriver.getKeyValueStore("aggStore");
store.put("a", 21L);
@ -1655,7 +1655,7 @@ public class TopologyTestDriverTest {
topology.addSource(sourceName, pattern2Source1);
topology.addSink("sink", SINK_TOPIC_1, sourceName);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
pipeRecord(SOURCE_TOPIC_1, testRecord1);
final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
@ -1674,7 +1674,7 @@ public class TopologyTestDriverTest {
topology.addSource(sourceName, pattern2Source1);
topology.addSink("sink", SINK_TOPIC_1, sourceName);
testDriver = new TopologyTestDriver(topology, config);
testDriver = new TopologyTestDriver(topology);
try {
pipeRecord(SOURCE_TOPIC_1, testRecord1);
} catch (final TopologyException exception) {
@ -1737,7 +1737,7 @@ public class TopologyTestDriverTest {
topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology)) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
@ -1809,7 +1809,7 @@ public class TopologyTestDriverTest {
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology)) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());