From dd9f70e3d1734a455effc411ce092f6847aa0778 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Mon, 2 Mar 2020 03:31:15 +0300 Subject: [PATCH] KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197) These classes are used by `upgrade_test.py` with old Kafka versions so they can only use functionality that exists in all Kafka versions. This change fixes the test for Kafka versions older than 0.11.0. Reviewers: Ismael Juma --- .../apache/kafka/tools/VerifiableConsumer.java | 13 ++++++++----- .../apache/kafka/tools/VerifiableProducer.java | 15 +++++++++------ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index b65eb9a2a7d..5be76f00062 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -43,7 +43,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -633,7 +632,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons brokerHostandPort = res.getString("brokerList"); } else { parser.printHelp(); - Exit.exit(0); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(0); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort); @@ -663,16 +663,19 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons ArgumentParser parser = argParser(); if (args.length == 0) { parser.printHelp(); - Exit.exit(0); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(0); } try { final VerifiableConsumer consumer = createFromArgs(parser, args); - Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close()); + // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0. + Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook")); consumer.run(); } catch (ArgumentParserException e) { parser.handleError(e); - Exit.exit(1); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(1); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index befdfddf3f7..ee863d4b175 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Exit; import java.io.IOException; import java.io.InputStream; @@ -240,7 +239,8 @@ public class VerifiableProducer implements AutoCloseable { producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); } else { parser.printHelp(); - Exit.exit(0); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(0); } producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -528,7 +528,8 @@ public class VerifiableProducer implements AutoCloseable { ArgumentParser parser = argParser(); if (args.length == 0) { parser.printHelp(); - Exit.exit(0); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(0); } try { @@ -537,7 +538,8 @@ public class VerifiableProducer implements AutoCloseable { final long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs); - Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> { + // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0. + Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Trigger main thread to stop producing messages producer.stopProducing = true; @@ -549,12 +551,13 @@ public class VerifiableProducer implements AutoCloseable { double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs)); producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput)); - }); + }, "verifiable-producer-shutdown-hook")); producer.run(throttler); } catch (ArgumentParserException e) { parser.handleError(e); - Exit.exit(1); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(1); } }