From 68070c94a6bd5141c720aaff85fa545da3a7a1b5 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Tue, 11 Jun 2024 11:19:31 +0530 Subject: [PATCH] KAFKA-16724: Added support for fractional throughput and monotonic payload in kafka-producer-perf-test.sh Added support for fractional throughput and monotonic payload in kafka-producer-perf-test.sh. https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-kafka-producer-perf-test.sh Reviewers: Andrew Schofield , Manikumar Reddy --- .../server/util/ThroughputThrottler.java | 8 +-- .../kafka/tools/ProducerPerformance.java | 26 ++++++--- .../kafka/tools/ProducerPerformanceTest.java | 58 +++++++++++++++++-- 3 files changed, 77 insertions(+), 15 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java b/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java index 84a12b3732c..8f0ea551410 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java @@ -43,7 +43,7 @@ public class ThroughputThrottler { private final long startMs; private final long sleepTimeNs; - private final long targetThroughput; + private final double targetThroughput; private long sleepDeficitNs = 0; private boolean wakeup = false; @@ -52,11 +52,11 @@ public class ThroughputThrottler { * @param targetThroughput Can be messages/sec or bytes/sec * @param startMs When the very first message is sent */ - public ThroughputThrottler(long targetThroughput, long startMs) { + public ThroughputThrottler(double targetThroughput, long startMs) { this.startMs = startMs; this.targetThroughput = targetThroughput; this.sleepTimeNs = targetThroughput > 0 ? - NS_PER_SEC / targetThroughput : + (long) (NS_PER_SEC / targetThroughput) : Long.MAX_VALUE; } @@ -73,7 +73,7 @@ public class ThroughputThrottler { } float elapsedSec = (sendStartMs - startMs) / 1000.f; - return elapsedSec > 0 && (amountSoFar / elapsedSec) > this.targetThroughput; + return elapsedSec > 0 && ((double) amountSoFar / elapsedSec) > this.targetThroughput; } /** diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 7efe151ee39..6d3a4c530d5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -62,7 +62,8 @@ public class ProducerPerformance { String topicName = res.getString("topic"); long numRecords = res.getLong("numRecords"); Integer recordSize = res.getInt("recordSize"); - int throughput = res.getInt("throughput"); + double throughput = res.getDouble("throughput"); + boolean payloadMonotonic = res.getBoolean("payloadMonotonic"); List producerProps = res.getList("producerConfig"); String producerConfig = res.getString("producerConfigFile"); String payloadFilePath = res.getString("payloadFile"); @@ -104,7 +105,7 @@ public class ProducerPerformance { long transactionStartTime = 0; for (long i = 0; i < numRecords; i++) { - payload = generateRandomPayload(recordSize, payloadByteList, payload, random); + payload = generateRandomPayload(recordSize, payloadByteList, payload, random, payloadMonotonic, i); if (transactionsEnabled && currentTransactionSize == 0) { producer.beginTransaction(); @@ -170,14 +171,16 @@ public class ProducerPerformance { Stats stats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, - SplittableRandom random) { + SplittableRandom random, boolean payloadMonotonic, long recordValue) { if (!payloadByteList.isEmpty()) { payload = payloadByteList.get(random.nextInt(payloadByteList.size())); } else if (recordSize != null) { for (int j = 0; j < payload.length; ++j) payload[j] = (byte) (random.nextInt(26) + 65); + } else if (payloadMonotonic) { + payload = Long.toString(recordValue).getBytes(StandardCharsets.UTF_8); } else { - throw new IllegalArgumentException("no payload File Path or record Size provided"); + throw new IllegalArgumentException("no payload File Path or record Size or payload-monotonic option provided"); } return payload; } @@ -258,7 +261,8 @@ public class ProducerPerformance { .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") - .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); + .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file " + + "or --payload-monotonic."); payloadOptions.addArgument("--payload-file") .action(store()) @@ -268,7 +272,15 @@ public class ProducerPerformance { .dest("payloadFile") .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + - "Note that you must provide exactly one of --record-size or --payload-file."); + "Note that you must provide exactly one of --record-size or --payload-file or --payload-monotonic."); + + payloadOptions.addArgument("--payload-monotonic") + .action(storeTrue()) + .type(Boolean.class) + .metavar("PAYLOAD-MONOTONIC") + .dest("payloadMonotonic") + .help("payload is monotonically increasing integer. Note that you must provide exactly one of --record-size " + + "or --payload-file or --payload-monotonic."); parser.addArgument("--payload-delimiter") .action(store()) @@ -284,7 +296,7 @@ public class ProducerPerformance { parser.addArgument("--throughput") .action(store()) .required(true) - .type(Integer.class) + .type(Double.class) .metavar("THROUGHPUT") .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling."); diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 39893e33bb1..e181913591c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -155,6 +155,31 @@ public class ProducerPerformanceTest { verify(producerMock, times(1)).close(); } + @Test + public void testMutuallyExclusiveGroup() { + String[] args1 = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--payload-monotonic", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser1 = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser1.parseArgs(args1)); + assertEquals("argument --payload-monotonic: not allowed with argument --record-size", thrown.getMessage()); + + String[] args2 = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--payload-file", "abc.txt", + "--payload-monotonic", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser2 = ProducerPerformance.argParser(); + thrown = assertThrows(ArgumentParserException.class, () -> parser2.parseArgs(args2)); + assertEquals("argument --payload-monotonic: not allowed with argument --payload-file", thrown.getMessage()); + } + @Test public void testUnexpectedArg() { String[] args = new String[] { @@ -169,6 +194,18 @@ public class ProducerPerformanceTest { assertEquals("unrecognized arguments: '--test'", thrown.getMessage()); } + @Test + public void testFractionalThroughput() { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "1.25", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + assertDoesNotThrow(() -> parser.parseArgs(args)); + } + @Test public void testGenerateRandomPayloadByPayloadFile() { Integer recordSize = null; @@ -179,7 +216,7 @@ public class ProducerPerformanceTest { byte[] payload = null; SplittableRandom random = new SplittableRandom(0); - payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random); + payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L); assertEquals(inputString, new String(payload)); } @@ -190,12 +227,25 @@ public class ProducerPerformanceTest { List payloadByteList = new ArrayList<>(); SplittableRandom random = new SplittableRandom(0); - payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random); + payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L); for (byte b : payload) { assertNotEquals(0, b); } } + @Test + public void testGenerateMonotonicPayload() { + byte[] payload = null; + List payloadByteList = new ArrayList<>(); + SplittableRandom random = new SplittableRandom(0); + + for (int i = 0; i < 10; i++) { + payload = ProducerPerformance.generateRandomPayload(null, payloadByteList, payload, random, true, i); + assertEquals(1, payload.length); + assertEquals(i + '0', payload[0]); + } + } + @Test public void testGenerateRandomPayloadException() { Integer recordSize = null; @@ -203,8 +253,8 @@ public class ProducerPerformanceTest { List payloadByteList = new ArrayList<>(); SplittableRandom random = new SplittableRandom(0); - IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random)); - assertEquals("no payload File Path or record Size provided", thrown.getMessage()); + IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L)); + assertEquals("no payload File Path or record Size or payload-monotonic option provided", thrown.getMessage()); } @Test