KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)

These classes are used by `upgrade_test.py` with old Kafka versions so they can
only use functionality that exists in all Kafka versions. This change fixes the test
for Kafka versions older than 0.11.0.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Nikolay 2020-03-02 03:31:15 +03:00 committed by GitHub
parent a1f2ece323
commit c07db1c7d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 11 deletions

View File

@ -43,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -633,7 +632,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
brokerHostandPort = res.getString("brokerList");
} else {
parser.printHelp();
Exit.exit(0);
// Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
System.exit(0);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
@ -663,16 +663,19 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
// Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
System.exit(0);
}
try {
final VerifiableConsumer consumer = createFromArgs(parser, args);
Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
// Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook"));
consumer.run();
} catch (ArgumentParserException e) {
parser.handleError(e);
Exit.exit(1);
// Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
System.exit(1);
}
}

View File

@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.io.InputStream;
@ -240,7 +239,8 @@ public class VerifiableProducer implements AutoCloseable {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
} else {
parser.printHelp();
Exit.exit(0);
// Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
System.exit(0);
}
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@ -528,7 +528,8 @@ public class VerifiableProducer implements AutoCloseable {
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
// Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
System.exit(0);
}
try {
@ -537,7 +538,8 @@ public class VerifiableProducer implements AutoCloseable {
final long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> {
// Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
@ -549,12 +551,13 @@ public class VerifiableProducer implements AutoCloseable {
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
});
}, "verifiable-producer-shutdown-hook"));
producer.run(throttler);
} catch (ArgumentParserException e) {
parser.handleError(e);
Exit.exit(1);
// Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
System.exit(1);
}
}