MINOR: TopologyTestDriver should not require dummy parameters (#9477)

TopologyTestDriver comes with a paper cut that it passes through a
config requirement that application.id and bootstrap.servers must be
configured. But these configs are not required in the context of
TopologyTestDriver specifically. This change relaxes the requirement.

Reviewers: Boyang Chen <boyang@apache.org>, Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
John Roesler 2020-10-22 08:19:01 -05:00 committed by GitHub
parent 47933088de
commit 58bd0a6ee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 55 additions and 69 deletions

View File

@ -73,7 +73,6 @@ public class DeveloperGuideTesting {
// setup test driver
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
testDriver = new TopologyTestDriver(topology, props);

View File

@ -340,7 +340,6 @@ public class TopologyTest {
final String badNodeName = "badGuy";
final Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
mockStoreBuilder();

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
@ -31,7 +32,6 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@ -85,7 +85,6 @@ public class AbstractStreamTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props);

View File

@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
@ -39,9 +38,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.junit.Assert.assertEquals;
public class KStreamTransformTest {
@ -83,10 +79,7 @@ public class KStreamTransformTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test")
)),
new Properties(),
Instant.ofEpochMilli(0L))) {
final TestInputTopic<Integer, Integer> inputTopic =
driver.createInputTopic(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());

View File

@ -24,13 +24,13 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
@ -77,8 +77,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
@ -144,8 +142,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
@ -182,8 +178,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
@ -265,8 +259,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {

View File

@ -183,7 +183,6 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final String applicationId = "ktable-ktable-joinOnForeignKey";
final Properties streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));
@ -243,7 +242,6 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final Properties config = new Properties();
final String safeTestName = safeUniqueTestName(getClass(), testName);
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

View File

@ -472,7 +472,6 @@ public class KTableTransformValuesTest {
public static Properties props() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());

View File

@ -19,20 +19,19 @@
package org.apache.kafka.streams.scala.utils
import java.time.Instant
import java.util.{Properties, UUID}
import java.util.Properties
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{StreamsConfig, TestInputTopic, TestOutputTopic, TopologyTestDriver}
import org.apache.kafka.test.TestUtils
import org.scalatest.Suite
trait TestDriver { this: Suite =>
def createTestDriver(builder: StreamsBuilder, initialWallClockTime: Instant = Instant.now()): TopologyTestDriver = {
val config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
config.put(StreamsConfig.STATE_DIR_CONFIG, s"out/state-store-${UUID.randomUUID()}")
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath)
new TopologyTestDriver(builder.build(), config, initialWallClockTime)
}

View File

@ -297,7 +297,15 @@ public class TopologyTestDriver implements Closeable {
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
final Properties configCopy = new Properties();
configCopy.putAll(config);
if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
}
if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-topology-test-driver-app-id");
}
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
logIfTaskIdleEnabled(streamsConfig);
logContext = new LogContext("topology-test-driver ");

View File

@ -31,9 +31,9 @@ import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
@ -218,7 +218,15 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
final Properties configCopy = new Properties();
configCopy.putAll(config);
if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
}
if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-mock-app-id");
}
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;

View File

@ -240,7 +240,15 @@ public class MockProcessorContext<KForward, VForward> implements ProcessorContex
* @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
*/
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
final Properties configCopy = new Properties();
configCopy.putAll(config);
if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
}
if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-mock-app-id");
}
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;

View File

@ -267,7 +267,6 @@ public class MockProcessorContextTest {
public void shouldCaptureApplicationAndRecordMetadata() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
@Override
@ -389,7 +388,6 @@ public class MockProcessorContextTest {
public void fullConstructorShouldSetAllExpectedAttributes() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());

View File

@ -44,9 +44,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@ -68,10 +65,6 @@ public class TestTopicsTest {
private final Serde<String> stringSerde = new Serdes.StringSerde();
private final Serde<Long> longSerde = new Serdes.LongSerde();
private final Properties config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "TestTopicsTest"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
));
private final Instant testBaseTime = Instant.parse("2019-06-01T10:00:00Z");
@Before
@ -82,7 +75,7 @@ public class TestTopicsTest {
final KStream<Long, String> source = builder.stream(INPUT_TOPIC_MAP, Consumed.with(longSerde, stringSerde));
final KStream<String, Long> mapped = source.map((key, value) -> new KeyValue<>(value, key));
mapped.to(OUTPUT_TOPIC_MAP, Produced.with(stringSerde, longSerde));
testDriver = new TopologyTestDriver(builder.build(), config);
testDriver = new TopologyTestDriver(builder.build(), new Properties());
}
@After

View File

@ -124,11 +124,7 @@ public class TopologyTestDriverTest {
consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
private TopologyTestDriver testDriver;
private final Properties config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
));
private final Properties config;
private KeyValueStore<String, Long> store;
private final StringDeserializer stringDeserializer = new StringDeserializer();
@ -145,7 +141,16 @@ public class TopologyTestDriverTest {
public TopologyTestDriverTest(final boolean eosEnabled) {
if (eosEnabled) {
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
));
} else {
config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
));
}
}
@ -433,6 +438,11 @@ public class TopologyTestDriverTest {
return builder.build(config);
}
@Test
public void shouldNotRequireParameters() {
new TopologyTestDriver(setupSingleProcessorTopology(), new Properties());
}
@Test
public void shouldInitProcessor() {
testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
@ -1527,7 +1537,6 @@ public class TopologyTestDriverTest {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver-cleanup");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
@ -1685,10 +1694,6 @@ public class TopologyTestDriverTest {
@Test
public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
final Topology topology = new Topology();
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
topology.addProcessor(
@ -1715,7 +1720,7 @@ public class TopologyTestDriverTest {
topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
@ -1737,10 +1742,6 @@ public class TopologyTestDriverTest {
@Test
public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
final Topology topology = new Topology();
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
topology.addGlobalStore(
@ -1791,7 +1792,7 @@ public class TopologyTestDriverTest {
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
@ -1817,9 +1818,6 @@ public class TopologyTestDriverTest {
@Test
public void shouldRespectTaskIdling() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
// This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver,
// so with an idle time specified, TTD can't just expect all enqueued records to be processable.
properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000");

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.test.wordcount;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
@ -89,15 +88,11 @@ public class WindowedWordCountProcessorTest {
@Test
public void shouldWorkWithPersistentStore() throws IOException {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
final File stateDir = TestUtils.tempDirectory();
try {
final MockProcessorContext<String, String> context = new MockProcessorContext<>(
properties,
new Properties(),
new TaskId(0, 0),
stateDir
);