MINOR: Streams integration tests should not call exit (#9067)

- replace System.exit with Exit.exit in all relevant classes
- forbid use of System.exit in all relevant classes and add exceptions for others

Co-authored-by: John Roesler <vvcephei@apache.org>
Co-authored-by: Matthias J. Sax <matthias@confluent.io>

Reviewers: Lucas Bradstreet <lucas@confluent.io>, Ismael Juma <ismael@confluent.io>
This commit is contained in:
John Roesler 2020-08-05 15:52:50 -05:00 committed by GitHub
parent bc883cdbcf
commit 26a217c8e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 74 additions and 40 deletions

View File

@ -103,6 +103,14 @@
<property name="ignoreComments" value="true"/>
</module>
<module name="Regexp">
<property name="id" value="dontUseSystemExit"/>
<property name="format" value="System\.exit"/>
<property name="illegalPattern" value="true"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="'System.exit': Should not directly call System.exit, but Exit.exit instead."/>
</module>
<!-- code quality -->
<module name="MethodLength"/>
<module name="ParameterNumber">

View File

@ -17,8 +17,12 @@
files="(ApiMessageType).java|MessageDataGenerator.java"/>
<suppress checks="MethodLength"
files="MessageDataGenerator.java"/>
<suppress id="dontUseSystemExit"
files="MessageGenerator.java"/>
<!-- Clients -->
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin).java"/>
<suppress checks="ClassFanOutComplexity"
@ -78,7 +82,7 @@
<suppress checks="NPathComplexity"
files="MessageTest.java"/>
<!-- clients tests -->
<!-- Clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
@ -167,7 +171,7 @@
<suppress checks="FinalLocalVariable"
files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/>
<!-- generated code -->
<!-- Generated code -->
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS)"
files="streams[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
@ -195,16 +199,19 @@
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
<!-- Streams Test-Utils -->
<!-- Streams test-utils -->
<suppress checks="ClassFanOutComplexity"
files="TopologyTestDriver.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="TopologyTestDriver.java"/>
<!-- Streams examples -->
<suppress id="dontUseSystemExit"
files="PageViewTypedDemo.java|PipeDemo.java|TemperatureDemo.java|WordCountDemo.java|WordCountProcessorDemo.java|WordCountTransformerDemo.java"/>
<!-- Tools -->
<suppress checks="ClassDataAbstractionCoupling"
files="VerifiableConsumer.java"/>
<suppress checks="CyclomaticComplexity"
files="(StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="BooleanExpressionComplexity"
@ -219,6 +226,10 @@
files="ProduceBenchSpec.java"/>
<suppress checks="ParameterNumber"
files="SustainedConnectionSpec.java"/>
<suppress id="dontUseSystemExit"
files="VerifiableConsumer.java"/>
<suppress id="dontUseSystemExit"
files="VerifiableProducer.java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.examples.pageview;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
@ -37,6 +36,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.examples.wordcount;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
@ -31,6 +30,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@ -80,6 +81,12 @@ public class SmokeTestDriverIntegrationTest {
@Test
public void shouldWorkWithRebalance() throws InterruptedException {
Exit.setExitProcedure((statusCode, message) -> {
throw new AssertionError("Test called exit(). code:" + statusCode + " message:" + message);
});
Exit.setHaltProcedure((statusCode, message) -> {
throw new AssertionError("Test called halt(). code:" + statusCode + " message:" + message);
});
int numClientsCreated = 0;
final ArrayList<SmokeTestClient> clients = new ArrayList<>();

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -49,7 +50,7 @@ public class BrokerCompatibilityTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("BrokerCompatibilityTest are expecting two parameters: propFile, processingMode; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
System.out.println("StreamsTest instance started");
@ -62,7 +63,7 @@ public class BrokerCompatibilityTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");

View File

@ -49,7 +49,7 @@ public class StreamsBrokerDownResilienceTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
System.out.println("StreamsTest instance started");
@ -62,7 +62,7 @@ public class StreamsBrokerDownResilienceTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience");
@ -86,7 +86,7 @@ public class StreamsBrokerDownResilienceTest {
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
System.exit(1);
Exit.exit(1);
}
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -31,7 +32,7 @@ public class StreamsEosTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -43,7 +44,7 @@ public class StreamsEosTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command) || "process-complex".equals(command)) {
@ -51,7 +52,7 @@ public class StreamsEosTest {
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) {
System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " + StreamsConfig.EXACTLY_ONCE_BETA);
System.exit(1);
Exit.exit(1);
}
}
@ -62,7 +63,7 @@ public class StreamsEosTest {
System.out.flush();
if (command == null || propFileName == null) {
System.exit(-1);
Exit.exit(-1);
}
switch (command) {
@ -84,7 +85,7 @@ public class StreamsEosTest {
default:
System.out.println("unknown command: " + command);
System.out.flush();
System.exit(-1);
Exit.exit(-1);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -53,7 +54,7 @@ public class StreamsSmokeTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command)) {
@ -64,7 +65,7 @@ public class StreamsSmokeTest {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + ", " +
StreamsConfig.EXACTLY_ONCE + ", or " + StreamsConfig.EXACTLY_ONCE_BETA);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -48,7 +48,7 @@ public class StreamsStandByReplicaTest {
if (args.length < 2) {
System.err.println("StreamsStandByReplicaTest are expecting two parameters: " +
"propFile, additionalConfigs; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
System.out.println("StreamsTest instance started");
@ -61,7 +61,7 @@ public class StreamsStandByReplicaTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
@ -75,7 +75,7 @@ public class StreamsStandByReplicaTest {
if (additionalConfigs == null) {
System.err.println("additional configs are not provided");
System.err.flush();
System.exit(1);
Exit.exit(1);
}
final Map<String, String> updated = SystemTestUtil.parseConfigs(additionalConfigs);
@ -92,7 +92,7 @@ public class StreamsStandByReplicaTest {
sinkTopic1,
sinkTopic2));
System.err.flush();
System.exit(1);
Exit.exit(1);
}
streamsProperties.putAll(updated);
@ -104,7 +104,7 @@ public class StreamsStandByReplicaTest {
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
System.exit(1);
Exit.exit(1);
}
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -53,7 +54,7 @@ public class StreamsSmokeTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command)) {
@ -63,7 +64,7 @@ public class StreamsSmokeTest {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -53,7 +54,7 @@ public class StreamsSmokeTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command)) {
@ -63,7 +64,7 @@ public class StreamsSmokeTest {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -53,7 +54,7 @@ public class StreamsSmokeTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command)) {
@ -63,7 +64,7 @@ public class StreamsSmokeTest {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -53,7 +54,7 @@ public class StreamsSmokeTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command)) {
@ -63,7 +64,7 @@ public class StreamsSmokeTest {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}
final String propFileName = args[0];
@ -53,7 +54,7 @@ public class StreamsSmokeTest {
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}
if ("process".equals(command)) {
@ -63,7 +64,7 @@ public class StreamsSmokeTest {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -382,6 +382,6 @@ public class TransactionalMessageCopier {
consumer.close();
}
}
System.exit(0);
Exit.exit(0);
}
}