diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 05a128b18c2..fd3dcfe7d35 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.internals.QuietStreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -235,7 +236,7 @@ public class TopologyTestDriver implements Closeable { private TopologyTestDriver(final InternalTopologyBuilder builder, final Properties config, final long initialWallClockTimeMs) { - final StreamsConfig streamsConfig = new StreamsConfig(config); + final StreamsConfig streamsConfig = new QuietStreamsConfig(config); mockWallClockTime = new MockTime(initialWallClockTimeMs); internalTopologyBuilder = builder; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java new file mode 100644 index 00000000000..61326687cfe --- /dev/null +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.streams.StreamsConfig; + +import java.util.Map; + +/** + * A {@link StreamsConfig} that does not log its configuration on construction. + * + * This producer cleaner output for unit tests using the {@code test-utils}, + * since logging the config is not really valuable in this context. + */ +public class QuietStreamsConfig extends StreamsConfig { + public QuietStreamsConfig(final Map props) { + super(props, false); + } +} diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index dc854b0a5f5..553428deb4b 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.internals.QuietStreamsConfig; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -201,7 +202,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S */ @SuppressWarnings({"WeakerAccess", "unused"}) public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) { - final StreamsConfig streamsConfig = new StreamsConfig(config); + final StreamsConfig streamsConfig = new QuietStreamsConfig(config); this.taskId = taskId; this.config = streamsConfig; this.stateDir = stateDir; @@ -382,12 +383,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S punctuators.add(capturedPunctuator); - return new Cancellable() { - @Override - public void cancel() { - capturedPunctuator.cancel(); - } - }; + return capturedPunctuator::cancel; } /** @@ -506,8 +502,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S // This interface is assumed by state stores that add change-logging. // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception. - throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " + - "For processor unit tests, use an in-memory state store with change-logging disabled. " + - "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."); + throw new UnsupportedOperationException( + "MockProcessorContext does not provide record collection. " + + "For processor unit tests, use an in-memory state store with change-logging disabled. " + + "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration." + ); } }