mirror of https://github.com/apache/kafka.git
KAFKA-19011 Improve EndToEndLatency Tool with argument parser and message key/header support (#20301)
jira: https://issues.apache.org/jira/browse/KAFKA-19011 kip: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1172%3A+Improve+EndToEndLatency+tool This PR improves the usability and maintainability of the `kafka-e2e-latency.sh` tool: - Replaces fixed-index argument parsing with a proper argument parser (joptsimple) - Adds support for configuring: - -record-key-size: size of the message key - -num-headers: number of headers per message - -record-header-key-size: size of each header key - -record-header-size: size of each header value - Renames existing arguments to align with Kafka CLI conventions: - broker_list → bootstrap-server - num_messages → num-records - message_size_bytes → record-size - properties_file → command-config - Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6247fd9eb3
commit
ef10a52a52
|
@ -21,19 +21,25 @@ import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.clients.admin.NewTopic;
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.header.Header;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.server.util.CommandDefaultOptions;
|
||||||
|
import org.apache.kafka.server.util.CommandLineUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -42,16 +48,19 @@ import java.util.Set;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import joptsimple.OptionException;
|
||||||
|
import joptsimple.OptionSpec;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class records the average end to end latency for a single message to travel through Kafka.
|
* This class records the average end to end latency for a single message to travel through Kafka.
|
||||||
* Following are the required arguments
|
* Following are the required arguments
|
||||||
* <p> broker_list = location of the bootstrap broker for both the producer and the consumer </p>
|
* <p> --bootstrap-server = location of the bootstrap broker for both the producer and the consumer
|
||||||
* <p> topic = topic name used by both the producer and the consumer to send/receive messages </p>
|
* <p> --topic = topic name used by both the producer and the consumer to send/receive messages
|
||||||
* <p> num_messages = # messages to send </p>
|
* <p> --num-records = # messages to send
|
||||||
* <p> producer_acks = See ProducerConfig.ACKS_DOC </p>
|
* <p> --producer-acks = See ProducerConfig.ACKS_DOC
|
||||||
* <p> message_size_bytes = size of each message in bytes </p>
|
* <p> --record-size = size of each message value in bytes
|
||||||
*
|
*
|
||||||
* <p> e.g. [localhost:9092 test 10000 1 20] </p>
|
* <p> e.g. [./bin/kafka-e2e-latency.sh --bootstrap-server localhost:9092 --topic test-topic --num-records 1000 --producer-acks 1 --record-size 512]
|
||||||
*/
|
*/
|
||||||
public class EndToEndLatency {
|
public class EndToEndLatency {
|
||||||
private static final long POLL_TIMEOUT_MS = 60000;
|
private static final long POLL_TIMEOUT_MS = 60000;
|
||||||
|
@ -77,22 +86,23 @@ public class EndToEndLatency {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
static void execute(String... args) throws Exception {
|
static void execute(String[] args) throws Exception {
|
||||||
if (args.length != 5 && args.length != 6) {
|
String[] processedArgs = convertLegacyArgsIfNeeded(args);
|
||||||
throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
|
EndToEndLatencyCommandOptions opts = new EndToEndLatencyCommandOptions(processedArgs);
|
||||||
+ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
|
|
||||||
}
|
|
||||||
|
|
||||||
String brokers = args[0];
|
// required
|
||||||
String topic = args[1];
|
String brokers = opts.options.valueOf(opts.bootstrapServerOpt);
|
||||||
int numMessages = Integer.parseInt(args[2]);
|
String topic = opts.options.valueOf(opts.topicOpt);
|
||||||
String acks = args[3];
|
int numRecords = opts.options.valueOf(opts.numRecordsOpt);
|
||||||
int messageSizeBytes = Integer.parseInt(args[4]);
|
String acks = opts.options.valueOf(opts.acksOpt);
|
||||||
Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
|
int recordValueSize = opts.options.valueOf(opts.recordSizeOpt);
|
||||||
|
|
||||||
if (!List.of("1", "all").contains(acks)) {
|
// optional
|
||||||
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
|
Optional<String> propertiesFile = Optional.ofNullable(opts.options.valueOf(opts.commandConfigOpt));
|
||||||
}
|
int recordKeySize = opts.options.valueOf(opts.recordKeySizeOpt);
|
||||||
|
int numHeaders = opts.options.valueOf(opts.numHeadersOpt);
|
||||||
|
int headerKeySize = opts.options.valueOf(opts.recordHeaderKeySizeOpt);
|
||||||
|
int headerValueSize = opts.options.valueOf(opts.recordHeaderValueSizeOpt);
|
||||||
|
|
||||||
try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
|
try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
|
||||||
KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
|
KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
|
||||||
|
@ -102,18 +112,21 @@ public class EndToEndLatency {
|
||||||
}
|
}
|
||||||
setupConsumer(topic, consumer);
|
setupConsumer(topic, consumer);
|
||||||
double totalTime = 0.0;
|
double totalTime = 0.0;
|
||||||
long[] latencies = new long[numMessages];
|
long[] latencies = new long[numRecords];
|
||||||
Random random = new Random(0);
|
Random random = new Random(0);
|
||||||
|
|
||||||
for (int i = 0; i < numMessages; i++) {
|
for (int i = 0; i < numRecords; i++) {
|
||||||
byte[] message = randomBytesOfLen(random, messageSizeBytes);
|
byte[] recordKey = randomBytesOfLen(random, recordKeySize);
|
||||||
|
byte[] recordValue = randomBytesOfLen(random, recordValueSize);
|
||||||
|
List<Header> headers = generateHeadersWithSeparateSizes(random, numHeaders, headerKeySize, headerValueSize);
|
||||||
|
|
||||||
long begin = System.nanoTime();
|
long begin = System.nanoTime();
|
||||||
//Send message (of random bytes) synchronously then immediately poll for it
|
//Send message (of random bytes) synchronously then immediately poll for it
|
||||||
producer.send(new ProducerRecord<>(topic, message)).get();
|
producer.send(new ProducerRecord<>(topic, null, recordKey, recordValue, headers)).get();
|
||||||
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
|
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
|
||||||
long elapsed = System.nanoTime() - begin;
|
long elapsed = System.nanoTime() - begin;
|
||||||
|
|
||||||
validate(consumer, message, records);
|
validate(consumer, recordValue, records, recordKey, headers);
|
||||||
|
|
||||||
//Report progress
|
//Report progress
|
||||||
if (i % 1000 == 0)
|
if (i % 1000 == 0)
|
||||||
|
@ -122,35 +135,99 @@ public class EndToEndLatency {
|
||||||
latencies[i] = elapsed / 1000 / 1000;
|
latencies[i] = elapsed / 1000 / 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
printResults(numMessages, totalTime, latencies);
|
printResults(numRecords, totalTime, latencies);
|
||||||
consumer.commitSync();
|
consumer.commitSync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
|
static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, Iterable<Header> sentHeaders) {
|
||||||
if (records.isEmpty()) {
|
if (records.isEmpty()) {
|
||||||
consumer.commitSync();
|
commitAndThrow(consumer, "poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "ms])");
|
||||||
throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check result matches the original record
|
ConsumerRecord<byte[], byte[]> record = records.iterator().next();
|
||||||
String sent = new String(message, StandardCharsets.UTF_8);
|
String sent = new String(sentRecordValue, StandardCharsets.UTF_8);
|
||||||
String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
|
String read = new String(record.value(), StandardCharsets.UTF_8);
|
||||||
|
|
||||||
if (!read.equals(sent)) {
|
if (!read.equals(sent)) {
|
||||||
consumer.commitSync();
|
commitAndThrow(consumer, "The message value read [" + read + "] did not match the message value sent [" + sent + "]");
|
||||||
throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sentRecordKey != null) {
|
||||||
|
if (record.key() == null) {
|
||||||
|
commitAndThrow(consumer, "Expected message key but received null");
|
||||||
|
}
|
||||||
|
String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8);
|
||||||
|
String readKey = new String(record.key(), StandardCharsets.UTF_8);
|
||||||
|
if (!readKey.equals(sentKey)) {
|
||||||
|
commitAndThrow(consumer, "The message key read [" + readKey + "] did not match the message key sent [" + sentKey + "]");
|
||||||
|
}
|
||||||
|
} else if (record.key() != null) {
|
||||||
|
commitAndThrow(consumer, "Expected null message key but received [" + new String(record.key(), StandardCharsets.UTF_8) + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
validateHeaders(consumer, sentHeaders, record);
|
||||||
|
|
||||||
//Check we only got the one message
|
//Check we only got the one message
|
||||||
if (records.count() != 1) {
|
if (records.count() != 1) {
|
||||||
int count = records.count();
|
int count = records.count();
|
||||||
consumer.commitSync();
|
commitAndThrow(consumer, "Only one result was expected during this test. We found [" + count + "]");
|
||||||
throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void commitAndThrow(KafkaConsumer<byte[], byte[]> consumer, String message) {
|
||||||
|
consumer.commitSync();
|
||||||
|
throw new RuntimeException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void validateHeaders(KafkaConsumer<byte[], byte[]> consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) {
|
||||||
|
if (sentHeaders != null && sentHeaders.iterator().hasNext()) {
|
||||||
|
if (!record.headers().iterator().hasNext()) {
|
||||||
|
commitAndThrow(consumer, "Expected message headers but received none");
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<Header> sentIterator = sentHeaders.iterator();
|
||||||
|
Iterator<Header> receivedIterator = record.headers().iterator();
|
||||||
|
|
||||||
|
while (sentIterator.hasNext() && receivedIterator.hasNext()) {
|
||||||
|
Header sentHeader = sentIterator.next();
|
||||||
|
Header receivedHeader = receivedIterator.next();
|
||||||
|
if (!receivedHeader.key().equals(sentHeader.key()) || !Arrays.equals(receivedHeader.value(), sentHeader.value())) {
|
||||||
|
String receivedValueStr = receivedHeader.value() == null ? "null" : Arrays.toString(receivedHeader.value());
|
||||||
|
String sentValueStr = sentHeader.value() == null ? "null" : Arrays.toString(sentHeader.value());
|
||||||
|
commitAndThrow(consumer, "The message header read [" + receivedHeader.key() + ":" + receivedValueStr +
|
||||||
|
"] did not match the message header sent [" + sentHeader.key() + ":" + sentValueStr + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sentIterator.hasNext() || receivedIterator.hasNext()) {
|
||||||
|
commitAndThrow(consumer, "Header count mismatch between sent and received messages");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Header> generateHeadersWithSeparateSizes(Random random, int numHeaders, int keySize, int valueSize) {
|
||||||
|
List<Header> headers = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < numHeaders; i++) {
|
||||||
|
String headerKey = new String(randomBytesOfLen(random, keySize), StandardCharsets.UTF_8);
|
||||||
|
byte[] headerValue = valueSize == -1 ? null : randomBytesOfLen(random, valueSize);
|
||||||
|
headers.add(new Header() {
|
||||||
|
@Override
|
||||||
|
public String key() {
|
||||||
|
return headerKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] value() {
|
||||||
|
return headerValue;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
|
private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
|
||||||
List<TopicPartition> topicPartitions = consumer
|
List<TopicPartition> topicPartitions = consumer
|
||||||
.partitionsFor(topic)
|
.partitionsFor(topic)
|
||||||
|
@ -162,8 +239,8 @@ public class EndToEndLatency {
|
||||||
consumer.assignment().forEach(consumer::position);
|
consumer.assignment().forEach(consumer::position);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void printResults(int numMessages, double totalTime, long[] latencies) {
|
private static void printResults(int numRecords, double totalTime, long[] latencies) {
|
||||||
System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
|
System.out.printf("Avg latency: %.4f ms%n", totalTime / numRecords / 1000.0 / 1000.0);
|
||||||
Arrays.sort(latencies);
|
Arrays.sort(latencies);
|
||||||
int p50 = (int) latencies[(int) (latencies.length * 0.5)];
|
int p50 = (int) latencies[(int) (latencies.length * 0.5)];
|
||||||
int p99 = (int) latencies[(int) (latencies.length * 0.99)];
|
int p99 = (int) latencies[(int) (latencies.length * 0.99)];
|
||||||
|
@ -221,4 +298,173 @@ public class EndToEndLatency {
|
||||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||||
return new KafkaProducer<>(producerProps);
|
return new KafkaProducer<>(producerProps);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts legacy positional arguments to named arguments for backward compatibility.
|
||||||
|
*
|
||||||
|
* @param args the command line arguments to convert
|
||||||
|
* @return converted named arguments
|
||||||
|
* @throws Exception if the legacy arguments are invalid
|
||||||
|
* @deprecated Positional argument usage is deprecated and will be removed in Apache Kafka 5.0.
|
||||||
|
* Use named arguments instead: --bootstrap-server, --topic, --num-records, --producer-acks, --record-size, --command-config
|
||||||
|
*/
|
||||||
|
@Deprecated(since = "4.2", forRemoval = true)
|
||||||
|
static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
|
||||||
|
if (args.length == 0) {
|
||||||
|
return args;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg ->
|
||||||
|
arg.equals("--bootstrap-server") ||
|
||||||
|
arg.equals("--topic") ||
|
||||||
|
arg.equals("--num-records") ||
|
||||||
|
arg.equals("--producer-acks") ||
|
||||||
|
arg.equals("--record-size"));
|
||||||
|
if (hasRequiredNamedArgs) {
|
||||||
|
return args;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (args.length != 5 && args.length != 6) {
|
||||||
|
throw new TerseException("Invalid number of arguments. Expected 5 or 6 positional arguments, but got " + args.length + ". " +
|
||||||
|
"Usage: bootstrap-server topic num-records producer-acks record-size [optional] command-config");
|
||||||
|
}
|
||||||
|
|
||||||
|
return convertLegacyArgs(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String[] convertLegacyArgs(String[] legacyArgs) {
|
||||||
|
List<String> newArgs = new ArrayList<>();
|
||||||
|
|
||||||
|
// broker_list -> --bootstrap-server
|
||||||
|
newArgs.add("--bootstrap-server");
|
||||||
|
newArgs.add(legacyArgs[0]);
|
||||||
|
|
||||||
|
// topic -> --topic
|
||||||
|
newArgs.add("--topic");
|
||||||
|
newArgs.add(legacyArgs[1]);
|
||||||
|
|
||||||
|
// num_messages -> --num-records
|
||||||
|
newArgs.add("--num-records");
|
||||||
|
newArgs.add(legacyArgs[2]);
|
||||||
|
|
||||||
|
// producer_acks -> --producer-acks
|
||||||
|
newArgs.add("--producer-acks");
|
||||||
|
newArgs.add(legacyArgs[3]);
|
||||||
|
|
||||||
|
// message_size_bytes -> --record-size
|
||||||
|
newArgs.add("--record-size");
|
||||||
|
newArgs.add(legacyArgs[4]);
|
||||||
|
|
||||||
|
// properties_file -> --command-config
|
||||||
|
if (legacyArgs.length == 6) {
|
||||||
|
newArgs.add("--command-config");
|
||||||
|
newArgs.add(legacyArgs[5]);
|
||||||
|
}
|
||||||
|
System.out.println("WARNING: Positional argument usage is deprecated and will be removed in Apache Kafka 5.0. " +
|
||||||
|
"Please use named arguments instead: --bootstrap-server, --topic, --num-records, --producer-acks, --record-size, --command-config");
|
||||||
|
return newArgs.toArray(new String[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class EndToEndLatencyCommandOptions extends CommandDefaultOptions {
|
||||||
|
final OptionSpec<String> bootstrapServerOpt;
|
||||||
|
final OptionSpec<String> topicOpt;
|
||||||
|
final OptionSpec<Integer> numRecordsOpt;
|
||||||
|
final OptionSpec<String> acksOpt;
|
||||||
|
final OptionSpec<Integer> recordSizeOpt;
|
||||||
|
final OptionSpec<String> commandConfigOpt;
|
||||||
|
final OptionSpec<Integer> recordKeySizeOpt;
|
||||||
|
final OptionSpec<Integer> recordHeaderValueSizeOpt;
|
||||||
|
final OptionSpec<Integer> recordHeaderKeySizeOpt;
|
||||||
|
final OptionSpec<Integer> numHeadersOpt;
|
||||||
|
|
||||||
|
public EndToEndLatencyCommandOptions(String[] args) {
|
||||||
|
super(args);
|
||||||
|
|
||||||
|
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("bootstrap-server")
|
||||||
|
.ofType(String.class);
|
||||||
|
topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for the test.")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("topic-name")
|
||||||
|
.ofType(String.class);
|
||||||
|
numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of messages to send.")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("count")
|
||||||
|
.ofType(Integer.class);
|
||||||
|
acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer acknowledgements. Must be '1' or 'all'.")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("producer-acks")
|
||||||
|
.ofType(String.class);
|
||||||
|
recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size of each message payload in bytes.")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("bytes")
|
||||||
|
.ofType(Integer.class);
|
||||||
|
recordKeySizeOpt = parser.accepts("record-key-size", "Optional: The size of the message key in bytes. If not set, messages are sent without a key.")
|
||||||
|
.withOptionalArg()
|
||||||
|
.describedAs("bytes")
|
||||||
|
.ofType(Integer.class)
|
||||||
|
.defaultsTo(0);
|
||||||
|
recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", "Optional: The size of the message header key in bytes. Used together with record-header-size.")
|
||||||
|
.withOptionalArg()
|
||||||
|
.describedAs("bytes")
|
||||||
|
.ofType(Integer.class)
|
||||||
|
.defaultsTo(0);
|
||||||
|
recordHeaderValueSizeOpt = parser.accepts("record-header-size", "Optional: The size of message header value in bytes. Use -1 for null header value.")
|
||||||
|
.withOptionalArg()
|
||||||
|
.describedAs("bytes")
|
||||||
|
.ofType(Integer.class)
|
||||||
|
.defaultsTo(0);
|
||||||
|
numHeadersOpt = parser.accepts("num-headers", "Optional: The number of headers to include in each message.")
|
||||||
|
.withOptionalArg()
|
||||||
|
.describedAs("count")
|
||||||
|
.ofType(Integer.class)
|
||||||
|
.defaultsTo(0);
|
||||||
|
commandConfigOpt = parser.accepts("command-config", "Optional: A property file for Kafka producer/consumer/admin client configuration.")
|
||||||
|
.withOptionalArg()
|
||||||
|
.describedAs("config-file")
|
||||||
|
.ofType(String.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
options = parser.parse(args);
|
||||||
|
} catch (OptionException e) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
|
||||||
|
}
|
||||||
|
checkArgs();
|
||||||
|
}
|
||||||
|
|
||||||
|
void checkArgs() {
|
||||||
|
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures end-to-end latency in Kafka by sending messages and timing their reception.");
|
||||||
|
|
||||||
|
// check required arguments
|
||||||
|
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt);
|
||||||
|
|
||||||
|
// validate 'producer-acks'
|
||||||
|
String acksValue = options.valueOf(acksOpt);
|
||||||
|
if (!List.of("1", "all").contains(acksValue)) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Invalid value for --producer-acks. Latency testing requires synchronous acknowledgement. Please use '1' or 'all'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate for num-records and record-size
|
||||||
|
if (options.valueOf(numRecordsOpt) <= 0) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Value for --num-records must be a positive integer.");
|
||||||
|
}
|
||||||
|
if (options.valueOf(recordSizeOpt) < 0) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Value for --record-size must be a non-negative integer.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (options.valueOf(recordKeySizeOpt) < 0) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Value for --record-key-size must be a non-negative integer.");
|
||||||
|
}
|
||||||
|
if (options.valueOf(recordHeaderKeySizeOpt) < 0) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Value for --record-header-key-size must be a non-negative integer.");
|
||||||
|
}
|
||||||
|
if (options.valueOf(recordHeaderValueSizeOpt) < -1) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Value for --record-header-size must be a non-negative integer or -1 for null header value.");
|
||||||
|
}
|
||||||
|
if (options.valueOf(numHeadersOpt) < 0) {
|
||||||
|
CommandLineUtils.printUsageAndExit(parser, "Value for --num-headers must be a non-negative integer.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,10 @@ package org.apache.kafka.tools;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.common.header.Header;
|
||||||
|
import org.apache.kafka.common.header.Headers;
|
||||||
|
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||||
|
import org.apache.kafka.common.utils.Exit;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
@ -29,9 +33,18 @@ import org.mockito.quality.Strictness;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -39,6 +52,55 @@ import static org.mockito.Mockito.when;
|
||||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||||
public class EndToEndLatencyTest {
|
public class EndToEndLatencyTest {
|
||||||
|
|
||||||
|
private static final byte[] RECORD_VALUE = "record-sent".getBytes(StandardCharsets.UTF_8);
|
||||||
|
private static final byte[] RECORD_VALUE_DIFFERENT = "record-received".getBytes(StandardCharsets.UTF_8);
|
||||||
|
private static final byte[] RECORD_KEY = "key-sent".getBytes(StandardCharsets.UTF_8);
|
||||||
|
private static final byte[] RECORD_KEY_DIFFERENT = "key-received".getBytes(StandardCharsets.UTF_8);
|
||||||
|
private static final String HEADER_KEY = "header-key-sent";
|
||||||
|
private static final String HEADER_KEY_DIFFERENT = "header-key-received";
|
||||||
|
private static final byte[] HEADER_VALUE = "header-value-sent".getBytes(StandardCharsets.UTF_8);
|
||||||
|
private static final byte[] HEADER_VALUE_DIFFERENT = "header-value-received".getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
// legacy format test arguments
|
||||||
|
private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = {
|
||||||
|
"localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random"
|
||||||
|
};
|
||||||
|
|
||||||
|
private static class ArgsBuilder {
|
||||||
|
private final Map<String, String> params = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
private ArgsBuilder() {
|
||||||
|
params.put("--bootstrap-server", "localhost:9092");
|
||||||
|
params.put("--topic", "test-topic");
|
||||||
|
params.put("--num-records", "100");
|
||||||
|
params.put("--producer-acks", "1");
|
||||||
|
params.put("--record-size", "200");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ArgsBuilder defaults() {
|
||||||
|
return new ArgsBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArgsBuilder with(String param, String value) {
|
||||||
|
params.put(param, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String[] build() {
|
||||||
|
return params.entrySet().stream()
|
||||||
|
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue()))
|
||||||
|
.toArray(String[]::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArgsBuilder withNegative(String param) {
|
||||||
|
return with(param, "-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArgsBuilder withZero(String param) {
|
||||||
|
return with(param, "0");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
KafkaConsumer<byte[], byte[]> consumer;
|
KafkaConsumer<byte[], byte[]> consumer;
|
||||||
|
|
||||||
|
@ -46,33 +108,176 @@ public class EndToEndLatencyTest {
|
||||||
ConsumerRecords<byte[], byte[]> records;
|
ConsumerRecords<byte[], byte[]> records;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFailWhenSuppliedUnexpectedArgs() {
|
public void testInvalidArgs() {
|
||||||
String[] args = new String[] {"localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random"};
|
testUnexpectedArgsWithLegacyFormat();
|
||||||
assertThrows(TerseException.class, () -> EndToEndLatency.execute(args));
|
testInvalidProducerAcks();
|
||||||
|
testInvalidNumRecords();
|
||||||
|
testInvalidRecordSize();
|
||||||
|
testInvalidRecordKey();
|
||||||
|
testInvalidNumHeaders();
|
||||||
|
testInvalidRecordHeaderKey();
|
||||||
|
testInvalidRecordHeaderValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testUnexpectedArgsWithLegacyFormat() {
|
||||||
|
String expectedMsg = "Invalid number of arguments. Expected 5 or 6 positional arguments, but got 7.";
|
||||||
|
TerseException terseException = assertThrows(TerseException.class, () -> EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED));
|
||||||
|
assertTrue(terseException.getMessage().contains(expectedMsg));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidNumRecords() {
|
||||||
|
String expectedMsg = "Value for --num-records must be a positive integer.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withNegative("--num-records").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidRecordSize() {
|
||||||
|
String expectedMsg = "Value for --record-size must be a non-negative integer.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withNegative("--record-size").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidRecordKey() {
|
||||||
|
String expectedMsg = "Value for --record-key-size must be a non-negative integer.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withNegative("--record-key-size").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidNumHeaders() {
|
||||||
|
String expectedMsg = "Value for --num-headers must be a non-negative integer.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withNegative("--num-headers").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidRecordHeaderKey() {
|
||||||
|
String expectedMsg = "Value for --record-header-key-size must be a non-negative integer.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withNegative("--record-header-key-size").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidRecordHeaderValue() {
|
||||||
|
String expectedMsg = "Value for --record-header-size must be a non-negative integer.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withNegative("--record-header-size").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidProducerAcks() {
|
||||||
|
String expectedMsg = "Invalid value for --producer-acks. Latency testing requires synchronous acknowledgement. Please use '1' or 'all'.";
|
||||||
|
assertInitializeInvalidOptionsExitCodeAndMsg(
|
||||||
|
ArgsBuilder.defaults().withZero("--producer-acks").build(), expectedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertInitializeInvalidOptionsExitCodeAndMsg(String[] args, String expectedMsg) {
|
||||||
|
Exit.setExitProcedure((exitCode, message) -> {
|
||||||
|
assertEquals(1, exitCode);
|
||||||
|
assertTrue(message.contains(expectedMsg));
|
||||||
|
throw new RuntimeException();
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
assertThrows(RuntimeException.class, () -> EndToEndLatency.execute(args));
|
||||||
|
} finally {
|
||||||
|
Exit.resetExitProcedure();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFailWhenProducerAcksAreNotSynchronised() {
|
@SuppressWarnings("removal")
|
||||||
String[] args = new String[] {"localhost:9092", "test", "10000", "0", "200"};
|
public void testConvertLegacyArgs() throws Exception {
|
||||||
assertThrows(IllegalArgumentException.class, () -> EndToEndLatency.execute(args));
|
String[] legacyArgs = {"localhost:9092", "test", "100", "1", "200"};
|
||||||
|
String[] convertedArgs = EndToEndLatency.convertLegacyArgsIfNeeded(legacyArgs);
|
||||||
|
String[] expectedArgs = {
|
||||||
|
"--bootstrap-server", "localhost:9092",
|
||||||
|
"--topic", "test",
|
||||||
|
"--num-records", "100",
|
||||||
|
"--producer-acks", "1",
|
||||||
|
"--record-size", "200"
|
||||||
|
};
|
||||||
|
assertArrayEquals(expectedArgs, convertedArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFailWhenConsumerRecordsIsEmpty() {
|
public void shouldFailWhenConsumerRecordsIsEmpty() {
|
||||||
when(records.isEmpty()).thenReturn(true);
|
when(records.isEmpty()).thenReturn(true);
|
||||||
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, new byte[0], records));
|
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, new byte[0], records, null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void shouldFailWhenSentIsNotEqualToReceived() {
|
public void shouldFailWhenSentRecordIsNotEqualToReceived() {
|
||||||
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
||||||
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
||||||
when(records.isEmpty()).thenReturn(false);
|
when(records.isEmpty()).thenReturn(false);
|
||||||
when(records.iterator()).thenReturn(iterator);
|
when(records.iterator()).thenReturn(iterator);
|
||||||
when(iterator.next()).thenReturn(record);
|
when(iterator.next()).thenReturn(record);
|
||||||
when(record.value()).thenReturn("kafkab".getBytes(StandardCharsets.UTF_8));
|
when(record.value()).thenReturn(RECORD_VALUE_DIFFERENT);
|
||||||
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records));
|
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void shouldFailWhenSentRecordKeyIsNotEqualToReceived() {
|
||||||
|
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
||||||
|
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
||||||
|
when(records.isEmpty()).thenReturn(false);
|
||||||
|
when(records.iterator()).thenReturn(iterator);
|
||||||
|
when(iterator.next()).thenReturn(record);
|
||||||
|
when(record.value()).thenReturn(RECORD_VALUE);
|
||||||
|
when(record.key()).thenReturn(RECORD_KEY_DIFFERENT);
|
||||||
|
|
||||||
|
assertThrows(RuntimeException.class, () ->
|
||||||
|
EndToEndLatency.validate(consumer, RECORD_VALUE, records,
|
||||||
|
RECORD_KEY, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void shouldFailWhenSentHeaderKeyIsNotEqualToReceived() {
|
||||||
|
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
||||||
|
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
||||||
|
Headers headers = mock(Headers.class);
|
||||||
|
Iterator<Header> headerIterator = mock(Iterator.class);
|
||||||
|
Header receivedHeader = new RecordHeader(HEADER_KEY_DIFFERENT, HEADER_VALUE);
|
||||||
|
|
||||||
|
when(records.isEmpty()).thenReturn(false);
|
||||||
|
when(records.iterator()).thenReturn(iterator);
|
||||||
|
when(iterator.next()).thenReturn(record);
|
||||||
|
when(record.value()).thenReturn(RECORD_VALUE);
|
||||||
|
when(record.key()).thenReturn(null);
|
||||||
|
when(record.headers()).thenReturn(headers);
|
||||||
|
when(headers.iterator()).thenReturn(headerIterator);
|
||||||
|
when(headerIterator.hasNext()).thenReturn(true);
|
||||||
|
when(headerIterator.next()).thenReturn(receivedHeader);
|
||||||
|
|
||||||
|
Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
|
||||||
|
List<Header> sentHeaders = List.of(sentHeader);
|
||||||
|
|
||||||
|
assertThrows(RuntimeException.class, () ->
|
||||||
|
EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, sentHeaders));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void shouldFailWhenSentHeaderValueIsNotEqualToReceived() {
|
||||||
|
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
||||||
|
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
||||||
|
Headers headers = mock(Headers.class);
|
||||||
|
Iterator<Header> headerIterator = mock(Iterator.class);
|
||||||
|
Header receivedHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE_DIFFERENT);
|
||||||
|
Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
|
||||||
|
List<Header> sentHeaders = List.of(sentHeader);
|
||||||
|
|
||||||
|
when(records.isEmpty()).thenReturn(false);
|
||||||
|
when(records.iterator()).thenReturn(iterator);
|
||||||
|
when(iterator.next()).thenReturn(record);
|
||||||
|
when(record.value()).thenReturn(RECORD_VALUE);
|
||||||
|
when(record.key()).thenReturn(null);
|
||||||
|
when(record.headers()).thenReturn(headers);
|
||||||
|
when(headers.iterator()).thenReturn(headerIterator);
|
||||||
|
when(headerIterator.hasNext()).thenReturn(true);
|
||||||
|
when(headerIterator.next()).thenReturn(receivedHeader);
|
||||||
|
|
||||||
|
assertThrows(RuntimeException.class, () ->
|
||||||
|
EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, sentHeaders));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -83,9 +288,9 @@ public class EndToEndLatencyTest {
|
||||||
when(records.isEmpty()).thenReturn(false);
|
when(records.isEmpty()).thenReturn(false);
|
||||||
when(records.iterator()).thenReturn(iterator);
|
when(records.iterator()).thenReturn(iterator);
|
||||||
when(iterator.next()).thenReturn(record);
|
when(iterator.next()).thenReturn(record);
|
||||||
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
|
when(record.value()).thenReturn(RECORD_VALUE);
|
||||||
when(records.count()).thenReturn(2);
|
when(records.count()).thenReturn(2);
|
||||||
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records));
|
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -93,12 +298,40 @@ public class EndToEndLatencyTest {
|
||||||
public void shouldPassInValidation() {
|
public void shouldPassInValidation() {
|
||||||
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
|
||||||
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
|
||||||
|
Headers headers = mock(Headers.class);
|
||||||
|
Iterator<Header> headerIterator = mock(Iterator.class);
|
||||||
|
Header receivedHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
|
||||||
|
Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
|
||||||
|
List<Header> sentHeaders = List.of(sentHeader);
|
||||||
|
|
||||||
when(records.isEmpty()).thenReturn(false);
|
when(records.isEmpty()).thenReturn(false);
|
||||||
when(records.iterator()).thenReturn(iterator);
|
when(records.iterator()).thenReturn(iterator);
|
||||||
when(iterator.next()).thenReturn(record);
|
when(iterator.next()).thenReturn(record);
|
||||||
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
|
when(record.value()).thenReturn(RECORD_VALUE);
|
||||||
|
byte[] recordKey = RECORD_KEY;
|
||||||
|
when(record.key()).thenReturn(recordKey);
|
||||||
when(records.count()).thenReturn(1);
|
when(records.count()).thenReturn(1);
|
||||||
assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records));
|
when(record.headers()).thenReturn(headers);
|
||||||
|
when(headers.iterator()).thenReturn(headerIterator);
|
||||||
|
when(headerIterator.hasNext()).thenReturn(true, true, false);
|
||||||
|
when(headerIterator.next()).thenReturn(receivedHeader);
|
||||||
|
|
||||||
|
assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, RECORD_VALUE, records, recordKey, sentHeaders));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldPassWithNamedArgs() {
|
||||||
|
AtomicReference<Integer> exitStatus = new AtomicReference<>();
|
||||||
|
Exit.setExitProcedure((status, __) -> {
|
||||||
|
exitStatus.set(status);
|
||||||
|
throw new RuntimeException();
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
assertDoesNotThrow(() -> new EndToEndLatency.EndToEndLatencyCommandOptions(ArgsBuilder.defaults().build()));
|
||||||
|
assertNull(exitStatus.get());
|
||||||
|
} finally {
|
||||||
|
Exit.resetExitProcedure();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue