MINOR: don't log config during unit tests (#5671)

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
John Roesler 2018-09-30 18:52:55 -05:00 committed by Matthias J. Sax
parent eb61df642d
commit 49b5206a82
3 changed files with 43 additions and 11 deletions

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@ -235,7 +236,7 @@ public class TopologyTestDriver implements Closeable {
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new StreamsConfig(config);
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
mockWallClockTime = new MockTime(initialWallClockTimeMs);
internalTopologyBuilder = builder;

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Map;
/**
* A {@link StreamsConfig} that does not log its configuration on construction.
*
* This producer cleaner output for unit tests using the {@code test-utils},
* since logging the config is not really valuable in this context.
*/
public class QuietStreamsConfig extends StreamsConfig {
public QuietStreamsConfig(final Map<?, ?> props) {
super(props, false);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@ -201,7 +202,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
final StreamsConfig streamsConfig = new StreamsConfig(config);
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
@ -382,12 +383,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
punctuators.add(capturedPunctuator);
return new Cancellable() {
@Override
public void cancel() {
capturedPunctuator.cancel();
}
};
return capturedPunctuator::cancel;
}
/**
@ -506,8 +502,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
// This interface is assumed by state stores that add change-logging.
// Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " +
"For processor unit tests, use an in-memory state store with change-logging disabled. " +
"Alternatively, use the TopologyTestDriver for testing processor/store/topology integration.");
throw new UnsupportedOperationException(
"MockProcessorContext does not provide record collection. " +
"For processor unit tests, use an in-memory state store with change-logging disabled. " +
"Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
);
}
}