diff --git a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java index c6914b4667b..486e4f57618 100644 --- a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java +++ b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java @@ -21,19 +21,25 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -42,16 +48,19 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import joptsimple.OptionException; +import joptsimple.OptionSpec; + /** * This class records the average end to end latency for a single message to travel through Kafka. * Following are the required arguments - *

broker_list = location of the bootstrap broker for both the producer and the consumer

- *

topic = topic name used by both the producer and the consumer to send/receive messages

- *

num_messages = # messages to send

- *

producer_acks = See ProducerConfig.ACKS_DOC

- *

message_size_bytes = size of each message in bytes

+ *

--bootstrap-server = location of the bootstrap broker for both the producer and the consumer + *

--topic = topic name used by both the producer and the consumer to send/receive messages + *

--num-records = # messages to send + *

--producer-acks = See ProducerConfig.ACKS_DOC + *

--record-size = size of each message value in bytes * - *

e.g. [localhost:9092 test 10000 1 20]

+ *

e.g. [./bin/kafka-e2e-latency.sh --bootstrap-server localhost:9092 --topic test-topic --num-records 1000 --producer-acks 1 --record-size 512] */ public class EndToEndLatency { private static final long POLL_TIMEOUT_MS = 60000; @@ -77,22 +86,23 @@ public class EndToEndLatency { } // Visible for testing - static void execute(String... args) throws Exception { - if (args.length != 5 && args.length != 6) { - throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() - + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); - } + static void execute(String[] args) throws Exception { + String[] processedArgs = convertLegacyArgsIfNeeded(args); + EndToEndLatencyCommandOptions opts = new EndToEndLatencyCommandOptions(processedArgs); - String brokers = args[0]; - String topic = args[1]; - int numMessages = Integer.parseInt(args[2]); - String acks = args[3]; - int messageSizeBytes = Integer.parseInt(args[4]); - Optional propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty(); + // required + String brokers = opts.options.valueOf(opts.bootstrapServerOpt); + String topic = opts.options.valueOf(opts.topicOpt); + int numRecords = opts.options.valueOf(opts.numRecordsOpt); + String acks = opts.options.valueOf(opts.acksOpt); + int recordValueSize = opts.options.valueOf(opts.recordSizeOpt); - if (!List.of("1", "all").contains(acks)) { - throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); - } + // optional + Optional propertiesFile = Optional.ofNullable(opts.options.valueOf(opts.commandConfigOpt)); + int recordKeySize = opts.options.valueOf(opts.recordKeySizeOpt); + int numHeaders = opts.options.valueOf(opts.numHeadersOpt); + int headerKeySize = opts.options.valueOf(opts.recordHeaderKeySizeOpt); + int headerValueSize = opts.options.valueOf(opts.recordHeaderValueSizeOpt); try (KafkaConsumer consumer = createKafkaConsumer(propertiesFile, brokers); KafkaProducer producer = createKafkaProducer(propertiesFile, brokers, acks)) { @@ -102,18 +112,21 @@ public class EndToEndLatency { } setupConsumer(topic, consumer); double totalTime = 0.0; - long[] latencies = new long[numMessages]; + long[] latencies = new long[numRecords]; Random random = new Random(0); - for (int i = 0; i < numMessages; i++) { - byte[] message = randomBytesOfLen(random, messageSizeBytes); + for (int i = 0; i < numRecords; i++) { + byte[] recordKey = randomBytesOfLen(random, recordKeySize); + byte[] recordValue = randomBytesOfLen(random, recordValueSize); + List

headers = generateHeadersWithSeparateSizes(random, numHeaders, headerKeySize, headerValueSize); + long begin = System.nanoTime(); //Send message (of random bytes) synchronously then immediately poll for it - producer.send(new ProducerRecord<>(topic, message)).get(); + producer.send(new ProducerRecord<>(topic, null, recordKey, recordValue, headers)).get(); ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); long elapsed = System.nanoTime() - begin; - validate(consumer, message, records); + validate(consumer, recordValue, records, recordKey, headers); //Report progress if (i % 1000 == 0) @@ -122,35 +135,99 @@ public class EndToEndLatency { latencies[i] = elapsed / 1000 / 1000; } - printResults(numMessages, totalTime, latencies); + printResults(numRecords, totalTime, latencies); consumer.commitSync(); } } // Visible for testing - static void validate(KafkaConsumer consumer, byte[] message, ConsumerRecords records) { + static void validate(KafkaConsumer consumer, byte[] sentRecordValue, ConsumerRecords records, byte[] sentRecordKey, Iterable
sentHeaders) { if (records.isEmpty()) { - consumer.commitSync(); - throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])"); + commitAndThrow(consumer, "poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "ms])"); } - //Check result matches the original record - String sent = new String(message, StandardCharsets.UTF_8); - String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8); + ConsumerRecord record = records.iterator().next(); + String sent = new String(sentRecordValue, StandardCharsets.UTF_8); + String read = new String(record.value(), StandardCharsets.UTF_8); if (!read.equals(sent)) { - consumer.commitSync(); - throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]"); + commitAndThrow(consumer, "The message value read [" + read + "] did not match the message value sent [" + sent + "]"); } + if (sentRecordKey != null) { + if (record.key() == null) { + commitAndThrow(consumer, "Expected message key but received null"); + } + String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8); + String readKey = new String(record.key(), StandardCharsets.UTF_8); + if (!readKey.equals(sentKey)) { + commitAndThrow(consumer, "The message key read [" + readKey + "] did not match the message key sent [" + sentKey + "]"); + } + } else if (record.key() != null) { + commitAndThrow(consumer, "Expected null message key but received [" + new String(record.key(), StandardCharsets.UTF_8) + "]"); + } + + validateHeaders(consumer, sentHeaders, record); + //Check we only got the one message if (records.count() != 1) { int count = records.count(); - consumer.commitSync(); - throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]"); + commitAndThrow(consumer, "Only one result was expected during this test. We found [" + count + "]"); } } + private static void commitAndThrow(KafkaConsumer consumer, String message) { + consumer.commitSync(); + throw new RuntimeException(message); + } + + private static void validateHeaders(KafkaConsumer consumer, Iterable
sentHeaders, ConsumerRecord record) { + if (sentHeaders != null && sentHeaders.iterator().hasNext()) { + if (!record.headers().iterator().hasNext()) { + commitAndThrow(consumer, "Expected message headers but received none"); + } + + Iterator
sentIterator = sentHeaders.iterator(); + Iterator
receivedIterator = record.headers().iterator(); + + while (sentIterator.hasNext() && receivedIterator.hasNext()) { + Header sentHeader = sentIterator.next(); + Header receivedHeader = receivedIterator.next(); + if (!receivedHeader.key().equals(sentHeader.key()) || !Arrays.equals(receivedHeader.value(), sentHeader.value())) { + String receivedValueStr = receivedHeader.value() == null ? "null" : Arrays.toString(receivedHeader.value()); + String sentValueStr = sentHeader.value() == null ? "null" : Arrays.toString(sentHeader.value()); + commitAndThrow(consumer, "The message header read [" + receivedHeader.key() + ":" + receivedValueStr + + "] did not match the message header sent [" + sentHeader.key() + ":" + sentValueStr + "]"); + } + } + + if (sentIterator.hasNext() || receivedIterator.hasNext()) { + commitAndThrow(consumer, "Header count mismatch between sent and received messages"); + } + } + } + + private static List
generateHeadersWithSeparateSizes(Random random, int numHeaders, int keySize, int valueSize) { + List
headers = new ArrayList<>(); + + for (int i = 0; i < numHeaders; i++) { + String headerKey = new String(randomBytesOfLen(random, keySize), StandardCharsets.UTF_8); + byte[] headerValue = valueSize == -1 ? null : randomBytesOfLen(random, valueSize); + headers.add(new Header() { + @Override + public String key() { + return headerKey; + } + + @Override + public byte[] value() { + return headerValue; + } + }); + } + return headers; + } + private static void setupConsumer(String topic, KafkaConsumer consumer) { List topicPartitions = consumer .partitionsFor(topic) @@ -162,8 +239,8 @@ public class EndToEndLatency { consumer.assignment().forEach(consumer::position); } - private static void printResults(int numMessages, double totalTime, long[] latencies) { - System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0); + private static void printResults(int numRecords, double totalTime, long[] latencies) { + System.out.printf("Avg latency: %.4f ms%n", totalTime / numRecords / 1000.0 / 1000.0); Arrays.sort(latencies); int p50 = (int) latencies[(int) (latencies.length * 0.5)]; int p99 = (int) latencies[(int) (latencies.length * 0.99)]; @@ -221,4 +298,173 @@ public class EndToEndLatency { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<>(producerProps); } + + /** + * Converts legacy positional arguments to named arguments for backward compatibility. + * + * @param args the command line arguments to convert + * @return converted named arguments + * @throws Exception if the legacy arguments are invalid + * @deprecated Positional argument usage is deprecated and will be removed in Apache Kafka 5.0. + * Use named arguments instead: --bootstrap-server, --topic, --num-records, --producer-acks, --record-size, --command-config + */ + @Deprecated(since = "4.2", forRemoval = true) + static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception { + if (args.length == 0) { + return args; + } + + boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> + arg.equals("--bootstrap-server") || + arg.equals("--topic") || + arg.equals("--num-records") || + arg.equals("--producer-acks") || + arg.equals("--record-size")); + if (hasRequiredNamedArgs) { + return args; + } + + if (args.length != 5 && args.length != 6) { + throw new TerseException("Invalid number of arguments. Expected 5 or 6 positional arguments, but got " + args.length + ". " + + "Usage: bootstrap-server topic num-records producer-acks record-size [optional] command-config"); + } + + return convertLegacyArgs(args); + } + + private static String[] convertLegacyArgs(String[] legacyArgs) { + List newArgs = new ArrayList<>(); + + // broker_list -> --bootstrap-server + newArgs.add("--bootstrap-server"); + newArgs.add(legacyArgs[0]); + + // topic -> --topic + newArgs.add("--topic"); + newArgs.add(legacyArgs[1]); + + // num_messages -> --num-records + newArgs.add("--num-records"); + newArgs.add(legacyArgs[2]); + + // producer_acks -> --producer-acks + newArgs.add("--producer-acks"); + newArgs.add(legacyArgs[3]); + + // message_size_bytes -> --record-size + newArgs.add("--record-size"); + newArgs.add(legacyArgs[4]); + + // properties_file -> --command-config + if (legacyArgs.length == 6) { + newArgs.add("--command-config"); + newArgs.add(legacyArgs[5]); + } + System.out.println("WARNING: Positional argument usage is deprecated and will be removed in Apache Kafka 5.0. " + + "Please use named arguments instead: --bootstrap-server, --topic, --num-records, --producer-acks, --record-size, --command-config"); + return newArgs.toArray(new String[0]); + } + + public static final class EndToEndLatencyCommandOptions extends CommandDefaultOptions { + final OptionSpec bootstrapServerOpt; + final OptionSpec topicOpt; + final OptionSpec numRecordsOpt; + final OptionSpec acksOpt; + final OptionSpec recordSizeOpt; + final OptionSpec commandConfigOpt; + final OptionSpec recordKeySizeOpt; + final OptionSpec recordHeaderValueSizeOpt; + final OptionSpec recordHeaderKeySizeOpt; + final OptionSpec numHeadersOpt; + + public EndToEndLatencyCommandOptions(String[] args) { + super(args); + + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg() + .describedAs("bootstrap-server") + .ofType(String.class); + topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for the test.") + .withRequiredArg() + .describedAs("topic-name") + .ofType(String.class); + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of messages to send.") + .withRequiredArg() + .describedAs("count") + .ofType(Integer.class); + acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer acknowledgements. Must be '1' or 'all'.") + .withRequiredArg() + .describedAs("producer-acks") + .ofType(String.class); + recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size of each message payload in bytes.") + .withRequiredArg() + .describedAs("bytes") + .ofType(Integer.class); + recordKeySizeOpt = parser.accepts("record-key-size", "Optional: The size of the message key in bytes. If not set, messages are sent without a key.") + .withOptionalArg() + .describedAs("bytes") + .ofType(Integer.class) + .defaultsTo(0); + recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", "Optional: The size of the message header key in bytes. Used together with record-header-size.") + .withOptionalArg() + .describedAs("bytes") + .ofType(Integer.class) + .defaultsTo(0); + recordHeaderValueSizeOpt = parser.accepts("record-header-size", "Optional: The size of message header value in bytes. Use -1 for null header value.") + .withOptionalArg() + .describedAs("bytes") + .ofType(Integer.class) + .defaultsTo(0); + numHeadersOpt = parser.accepts("num-headers", "Optional: The number of headers to include in each message.") + .withOptionalArg() + .describedAs("count") + .ofType(Integer.class) + .defaultsTo(0); + commandConfigOpt = parser.accepts("command-config", "Optional: A property file for Kafka producer/consumer/admin client configuration.") + .withOptionalArg() + .describedAs("config-file") + .ofType(String.class); + + try { + options = parser.parse(args); + } catch (OptionException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + } + checkArgs(); + } + + void checkArgs() { + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures end-to-end latency in Kafka by sending messages and timing their reception."); + + // check required arguments + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt); + + // validate 'producer-acks' + String acksValue = options.valueOf(acksOpt); + if (!List.of("1", "all").contains(acksValue)) { + CommandLineUtils.printUsageAndExit(parser, "Invalid value for --producer-acks. Latency testing requires synchronous acknowledgement. Please use '1' or 'all'."); + } + + // validate for num-records and record-size + if (options.valueOf(numRecordsOpt) <= 0) { + CommandLineUtils.printUsageAndExit(parser, "Value for --num-records must be a positive integer."); + } + if (options.valueOf(recordSizeOpt) < 0) { + CommandLineUtils.printUsageAndExit(parser, "Value for --record-size must be a non-negative integer."); + } + + if (options.valueOf(recordKeySizeOpt) < 0) { + CommandLineUtils.printUsageAndExit(parser, "Value for --record-key-size must be a non-negative integer."); + } + if (options.valueOf(recordHeaderKeySizeOpt) < 0) { + CommandLineUtils.printUsageAndExit(parser, "Value for --record-header-key-size must be a non-negative integer."); + } + if (options.valueOf(recordHeaderValueSizeOpt) < -1) { + CommandLineUtils.printUsageAndExit(parser, "Value for --record-header-size must be a non-negative integer or -1 for null header value."); + } + if (options.valueOf(numHeadersOpt) < 0) { + CommandLineUtils.printUsageAndExit(parser, "Value for --num-headers must be a non-negative integer."); + } + } + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java b/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java index 47ed7dd6666..e6f662b1d99 100644 --- a/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java @@ -19,6 +19,10 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.utils.Exit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,9 +33,18 @@ import org.mockito.quality.Strictness; import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,6 +52,55 @@ import static org.mockito.Mockito.when; @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class EndToEndLatencyTest { + private static final byte[] RECORD_VALUE = "record-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_VALUE_DIFFERENT = "record-received".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_KEY = "key-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_KEY_DIFFERENT = "key-received".getBytes(StandardCharsets.UTF_8); + private static final String HEADER_KEY = "header-key-sent"; + private static final String HEADER_KEY_DIFFERENT = "header-key-received"; + private static final byte[] HEADER_VALUE = "header-value-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] HEADER_VALUE_DIFFERENT = "header-value-received".getBytes(StandardCharsets.UTF_8); + + // legacy format test arguments + private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = { + "localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random" + }; + + private static class ArgsBuilder { + private final Map params = new LinkedHashMap<>(); + + private ArgsBuilder() { + params.put("--bootstrap-server", "localhost:9092"); + params.put("--topic", "test-topic"); + params.put("--num-records", "100"); + params.put("--producer-acks", "1"); + params.put("--record-size", "200"); + } + + public static ArgsBuilder defaults() { + return new ArgsBuilder(); + } + + public ArgsBuilder with(String param, String value) { + params.put(param, value); + return this; + } + + public String[] build() { + return params.entrySet().stream() + .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())) + .toArray(String[]::new); + } + + public ArgsBuilder withNegative(String param) { + return with(param, "-1"); + } + + public ArgsBuilder withZero(String param) { + return with(param, "0"); + } + } + @Mock KafkaConsumer consumer; @@ -46,33 +108,176 @@ public class EndToEndLatencyTest { ConsumerRecords records; @Test - public void shouldFailWhenSuppliedUnexpectedArgs() { - String[] args = new String[] {"localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random"}; - assertThrows(TerseException.class, () -> EndToEndLatency.execute(args)); + public void testInvalidArgs() { + testUnexpectedArgsWithLegacyFormat(); + testInvalidProducerAcks(); + testInvalidNumRecords(); + testInvalidRecordSize(); + testInvalidRecordKey(); + testInvalidNumHeaders(); + testInvalidRecordHeaderKey(); + testInvalidRecordHeaderValue(); + } + + private void testUnexpectedArgsWithLegacyFormat() { + String expectedMsg = "Invalid number of arguments. Expected 5 or 6 positional arguments, but got 7."; + TerseException terseException = assertThrows(TerseException.class, () -> EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED)); + assertTrue(terseException.getMessage().contains(expectedMsg)); + } + + private void testInvalidNumRecords() { + String expectedMsg = "Value for --num-records must be a positive integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--num-records").build(), expectedMsg); + } + + private void testInvalidRecordSize() { + String expectedMsg = "Value for --record-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-size").build(), expectedMsg); + } + + private void testInvalidRecordKey() { + String expectedMsg = "Value for --record-key-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-key-size").build(), expectedMsg); + } + + private void testInvalidNumHeaders() { + String expectedMsg = "Value for --num-headers must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--num-headers").build(), expectedMsg); + } + + private void testInvalidRecordHeaderKey() { + String expectedMsg = "Value for --record-header-key-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-header-key-size").build(), expectedMsg); + } + + private void testInvalidRecordHeaderValue() { + String expectedMsg = "Value for --record-header-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-header-size").build(), expectedMsg); + } + + private void testInvalidProducerAcks() { + String expectedMsg = "Invalid value for --producer-acks. Latency testing requires synchronous acknowledgement. Please use '1' or 'all'."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withZero("--producer-acks").build(), expectedMsg); + } + + private void assertInitializeInvalidOptionsExitCodeAndMsg(String[] args, String expectedMsg) { + Exit.setExitProcedure((exitCode, message) -> { + assertEquals(1, exitCode); + assertTrue(message.contains(expectedMsg)); + throw new RuntimeException(); + }); + try { + assertThrows(RuntimeException.class, () -> EndToEndLatency.execute(args)); + } finally { + Exit.resetExitProcedure(); + } } @Test - public void shouldFailWhenProducerAcksAreNotSynchronised() { - String[] args = new String[] {"localhost:9092", "test", "10000", "0", "200"}; - assertThrows(IllegalArgumentException.class, () -> EndToEndLatency.execute(args)); + @SuppressWarnings("removal") + public void testConvertLegacyArgs() throws Exception { + String[] legacyArgs = {"localhost:9092", "test", "100", "1", "200"}; + String[] convertedArgs = EndToEndLatency.convertLegacyArgsIfNeeded(legacyArgs); + String[] expectedArgs = { + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "100", + "--producer-acks", "1", + "--record-size", "200" + }; + assertArrayEquals(expectedArgs, convertedArgs); } @Test public void shouldFailWhenConsumerRecordsIsEmpty() { when(records.isEmpty()).thenReturn(true); - assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, new byte[0], records)); + assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, new byte[0], records, null, null)); } @Test @SuppressWarnings("unchecked") - public void shouldFailWhenSentIsNotEqualToReceived() { + public void shouldFailWhenSentRecordIsNotEqualToReceived() { Iterator> iterator = mock(Iterator.class); ConsumerRecord record = mock(ConsumerRecord.class); when(records.isEmpty()).thenReturn(false); when(records.iterator()).thenReturn(iterator); when(iterator.next()).thenReturn(record); - when(record.value()).thenReturn("kafkab".getBytes(StandardCharsets.UTF_8)); - assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records)); + when(record.value()).thenReturn(RECORD_VALUE_DIFFERENT); + assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, null)); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldFailWhenSentRecordKeyIsNotEqualToReceived() { + Iterator> iterator = mock(Iterator.class); + ConsumerRecord record = mock(ConsumerRecord.class); + when(records.isEmpty()).thenReturn(false); + when(records.iterator()).thenReturn(iterator); + when(iterator.next()).thenReturn(record); + when(record.value()).thenReturn(RECORD_VALUE); + when(record.key()).thenReturn(RECORD_KEY_DIFFERENT); + + assertThrows(RuntimeException.class, () -> + EndToEndLatency.validate(consumer, RECORD_VALUE, records, + RECORD_KEY, null)); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldFailWhenSentHeaderKeyIsNotEqualToReceived() { + Iterator> iterator = mock(Iterator.class); + ConsumerRecord record = mock(ConsumerRecord.class); + Headers headers = mock(Headers.class); + Iterator
headerIterator = mock(Iterator.class); + Header receivedHeader = new RecordHeader(HEADER_KEY_DIFFERENT, HEADER_VALUE); + + when(records.isEmpty()).thenReturn(false); + when(records.iterator()).thenReturn(iterator); + when(iterator.next()).thenReturn(record); + when(record.value()).thenReturn(RECORD_VALUE); + when(record.key()).thenReturn(null); + when(record.headers()).thenReturn(headers); + when(headers.iterator()).thenReturn(headerIterator); + when(headerIterator.hasNext()).thenReturn(true); + when(headerIterator.next()).thenReturn(receivedHeader); + + Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE); + List
sentHeaders = List.of(sentHeader); + + assertThrows(RuntimeException.class, () -> + EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, sentHeaders)); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldFailWhenSentHeaderValueIsNotEqualToReceived() { + Iterator> iterator = mock(Iterator.class); + ConsumerRecord record = mock(ConsumerRecord.class); + Headers headers = mock(Headers.class); + Iterator
headerIterator = mock(Iterator.class); + Header receivedHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE_DIFFERENT); + Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE); + List
sentHeaders = List.of(sentHeader); + + when(records.isEmpty()).thenReturn(false); + when(records.iterator()).thenReturn(iterator); + when(iterator.next()).thenReturn(record); + when(record.value()).thenReturn(RECORD_VALUE); + when(record.key()).thenReturn(null); + when(record.headers()).thenReturn(headers); + when(headers.iterator()).thenReturn(headerIterator); + when(headerIterator.hasNext()).thenReturn(true); + when(headerIterator.next()).thenReturn(receivedHeader); + + assertThrows(RuntimeException.class, () -> + EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, sentHeaders)); } @Test @@ -83,9 +288,9 @@ public class EndToEndLatencyTest { when(records.isEmpty()).thenReturn(false); when(records.iterator()).thenReturn(iterator); when(iterator.next()).thenReturn(record); - when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8)); + when(record.value()).thenReturn(RECORD_VALUE); when(records.count()).thenReturn(2); - assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records)); + assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, null)); } @Test @@ -93,12 +298,40 @@ public class EndToEndLatencyTest { public void shouldPassInValidation() { Iterator> iterator = mock(Iterator.class); ConsumerRecord record = mock(ConsumerRecord.class); + Headers headers = mock(Headers.class); + Iterator
headerIterator = mock(Iterator.class); + Header receivedHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE); + Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE); + List
sentHeaders = List.of(sentHeader); + when(records.isEmpty()).thenReturn(false); when(records.iterator()).thenReturn(iterator); when(iterator.next()).thenReturn(record); - when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8)); + when(record.value()).thenReturn(RECORD_VALUE); + byte[] recordKey = RECORD_KEY; + when(record.key()).thenReturn(recordKey); when(records.count()).thenReturn(1); - assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records)); + when(record.headers()).thenReturn(headers); + when(headers.iterator()).thenReturn(headerIterator); + when(headerIterator.hasNext()).thenReturn(true, true, false); + when(headerIterator.next()).thenReturn(receivedHeader); + + assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, RECORD_VALUE, records, recordKey, sentHeaders)); + } + + @Test + public void shouldPassWithNamedArgs() { + AtomicReference exitStatus = new AtomicReference<>(); + Exit.setExitProcedure((status, __) -> { + exitStatus.set(status); + throw new RuntimeException(); + }); + try { + assertDoesNotThrow(() -> new EndToEndLatency.EndToEndLatencyCommandOptions(ArgsBuilder.defaults().build())); + assertNull(exitStatus.get()); + } finally { + Exit.resetExitProcedure(); + } } }