KAFKA-16724: Added support for fractional throughput and monotonic payload in kafka-producer-perf-test.sh

Added support for fractional throughput and monotonic payload in kafka-producer-perf-test.sh.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-kafka-producer-perf-test.sh

Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
ShivsundarR 2024-06-11 11:19:31 +05:30 committed by GitHub
parent 31c55b630b
commit 68070c94a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 77 additions and 15 deletions

View File

@ -43,7 +43,7 @@ public class ThroughputThrottler {
private final long startMs; private final long startMs;
private final long sleepTimeNs; private final long sleepTimeNs;
private final long targetThroughput; private final double targetThroughput;
private long sleepDeficitNs = 0; private long sleepDeficitNs = 0;
private boolean wakeup = false; private boolean wakeup = false;
@ -52,11 +52,11 @@ public class ThroughputThrottler {
* @param targetThroughput Can be messages/sec or bytes/sec * @param targetThroughput Can be messages/sec or bytes/sec
* @param startMs When the very first message is sent * @param startMs When the very first message is sent
*/ */
public ThroughputThrottler(long targetThroughput, long startMs) { public ThroughputThrottler(double targetThroughput, long startMs) {
this.startMs = startMs; this.startMs = startMs;
this.targetThroughput = targetThroughput; this.targetThroughput = targetThroughput;
this.sleepTimeNs = targetThroughput > 0 ? this.sleepTimeNs = targetThroughput > 0 ?
NS_PER_SEC / targetThroughput : (long) (NS_PER_SEC / targetThroughput) :
Long.MAX_VALUE; Long.MAX_VALUE;
} }
@ -73,7 +73,7 @@ public class ThroughputThrottler {
} }
float elapsedSec = (sendStartMs - startMs) / 1000.f; float elapsedSec = (sendStartMs - startMs) / 1000.f;
return elapsedSec > 0 && (amountSoFar / elapsedSec) > this.targetThroughput; return elapsedSec > 0 && ((double) amountSoFar / elapsedSec) > this.targetThroughput;
} }
/** /**

View File

@ -62,7 +62,8 @@ public class ProducerPerformance {
String topicName = res.getString("topic"); String topicName = res.getString("topic");
long numRecords = res.getLong("numRecords"); long numRecords = res.getLong("numRecords");
Integer recordSize = res.getInt("recordSize"); Integer recordSize = res.getInt("recordSize");
int throughput = res.getInt("throughput"); double throughput = res.getDouble("throughput");
boolean payloadMonotonic = res.getBoolean("payloadMonotonic");
List<String> producerProps = res.getList("producerConfig"); List<String> producerProps = res.getList("producerConfig");
String producerConfig = res.getString("producerConfigFile"); String producerConfig = res.getString("producerConfigFile");
String payloadFilePath = res.getString("payloadFile"); String payloadFilePath = res.getString("payloadFile");
@ -104,7 +105,7 @@ public class ProducerPerformance {
long transactionStartTime = 0; long transactionStartTime = 0;
for (long i = 0; i < numRecords; i++) { for (long i = 0; i < numRecords; i++) {
payload = generateRandomPayload(recordSize, payloadByteList, payload, random); payload = generateRandomPayload(recordSize, payloadByteList, payload, random, payloadMonotonic, i);
if (transactionsEnabled && currentTransactionSize == 0) { if (transactionsEnabled && currentTransactionSize == 0) {
producer.beginTransaction(); producer.beginTransaction();
@ -170,14 +171,16 @@ public class ProducerPerformance {
Stats stats; Stats stats;
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload, static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
SplittableRandom random) { SplittableRandom random, boolean payloadMonotonic, long recordValue) {
if (!payloadByteList.isEmpty()) { if (!payloadByteList.isEmpty()) {
payload = payloadByteList.get(random.nextInt(payloadByteList.size())); payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
} else if (recordSize != null) { } else if (recordSize != null) {
for (int j = 0; j < payload.length; ++j) for (int j = 0; j < payload.length; ++j)
payload[j] = (byte) (random.nextInt(26) + 65); payload[j] = (byte) (random.nextInt(26) + 65);
} else if (payloadMonotonic) {
payload = Long.toString(recordValue).getBytes(StandardCharsets.UTF_8);
} else { } else {
throw new IllegalArgumentException("no payload File Path or record Size provided"); throw new IllegalArgumentException("no payload File Path or record Size or payload-monotonic option provided");
} }
return payload; return payload;
} }
@ -258,7 +261,8 @@ public class ProducerPerformance {
.type(Integer.class) .type(Integer.class)
.metavar("RECORD-SIZE") .metavar("RECORD-SIZE")
.dest("recordSize") .dest("recordSize")
.help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file " +
"or --payload-monotonic.");
payloadOptions.addArgument("--payload-file") payloadOptions.addArgument("--payload-file")
.action(store()) .action(store())
@ -268,7 +272,15 @@ public class ProducerPerformance {
.dest("payloadFile") .dest("payloadFile")
.help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " +
"Payloads will be read from this file and a payload will be randomly selected when sending messages. " + "Payloads will be read from this file and a payload will be randomly selected when sending messages. " +
"Note that you must provide exactly one of --record-size or --payload-file."); "Note that you must provide exactly one of --record-size or --payload-file or --payload-monotonic.");
payloadOptions.addArgument("--payload-monotonic")
.action(storeTrue())
.type(Boolean.class)
.metavar("PAYLOAD-MONOTONIC")
.dest("payloadMonotonic")
.help("payload is monotonically increasing integer. Note that you must provide exactly one of --record-size " +
"or --payload-file or --payload-monotonic.");
parser.addArgument("--payload-delimiter") parser.addArgument("--payload-delimiter")
.action(store()) .action(store())
@ -284,7 +296,7 @@ public class ProducerPerformance {
parser.addArgument("--throughput") parser.addArgument("--throughput")
.action(store()) .action(store())
.required(true) .required(true)
.type(Integer.class) .type(Double.class)
.metavar("THROUGHPUT") .metavar("THROUGHPUT")
.help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling."); .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.");

View File

@ -155,6 +155,31 @@ public class ProducerPerformanceTest {
verify(producerMock, times(1)).close(); verify(producerMock, times(1)).close();
} }
@Test
public void testMutuallyExclusiveGroup() {
String[] args1 = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "5",
"--throughput", "100",
"--record-size", "100",
"--payload-monotonic",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser1 = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser1.parseArgs(args1));
assertEquals("argument --payload-monotonic: not allowed with argument --record-size", thrown.getMessage());
String[] args2 = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "5",
"--throughput", "100",
"--payload-file", "abc.txt",
"--payload-monotonic",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser2 = ProducerPerformance.argParser();
thrown = assertThrows(ArgumentParserException.class, () -> parser2.parseArgs(args2));
assertEquals("argument --payload-monotonic: not allowed with argument --payload-file", thrown.getMessage());
}
@Test @Test
public void testUnexpectedArg() { public void testUnexpectedArg() {
String[] args = new String[] { String[] args = new String[] {
@ -169,6 +194,18 @@ public class ProducerPerformanceTest {
assertEquals("unrecognized arguments: '--test'", thrown.getMessage()); assertEquals("unrecognized arguments: '--test'", thrown.getMessage());
} }
@Test
public void testFractionalThroughput() {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "5",
"--throughput", "1.25",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
assertDoesNotThrow(() -> parser.parseArgs(args));
}
@Test @Test
public void testGenerateRandomPayloadByPayloadFile() { public void testGenerateRandomPayloadByPayloadFile() {
Integer recordSize = null; Integer recordSize = null;
@ -179,7 +216,7 @@ public class ProducerPerformanceTest {
byte[] payload = null; byte[] payload = null;
SplittableRandom random = new SplittableRandom(0); SplittableRandom random = new SplittableRandom(0);
payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random); payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L);
assertEquals(inputString, new String(payload)); assertEquals(inputString, new String(payload));
} }
@ -190,12 +227,25 @@ public class ProducerPerformanceTest {
List<byte[]> payloadByteList = new ArrayList<>(); List<byte[]> payloadByteList = new ArrayList<>();
SplittableRandom random = new SplittableRandom(0); SplittableRandom random = new SplittableRandom(0);
payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random); payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L);
for (byte b : payload) { for (byte b : payload) {
assertNotEquals(0, b); assertNotEquals(0, b);
} }
} }
@Test
public void testGenerateMonotonicPayload() {
byte[] payload = null;
List<byte[]> payloadByteList = new ArrayList<>();
SplittableRandom random = new SplittableRandom(0);
for (int i = 0; i < 10; i++) {
payload = ProducerPerformance.generateRandomPayload(null, payloadByteList, payload, random, true, i);
assertEquals(1, payload.length);
assertEquals(i + '0', payload[0]);
}
}
@Test @Test
public void testGenerateRandomPayloadException() { public void testGenerateRandomPayloadException() {
Integer recordSize = null; Integer recordSize = null;
@ -203,8 +253,8 @@ public class ProducerPerformanceTest {
List<byte[]> payloadByteList = new ArrayList<>(); List<byte[]> payloadByteList = new ArrayList<>();
SplittableRandom random = new SplittableRandom(0); SplittableRandom random = new SplittableRandom(0);
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random)); IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L));
assertEquals("no payload File Path or record Size provided", thrown.getMessage()); assertEquals("no payload File Path or record Size or payload-monotonic option provided", thrown.getMessage());
} }
@Test @Test