mirror of https://github.com/apache/kafka.git
KAFKA-4432; Added support to supply custom message payloads to perf-producer script.
Current implementation of ProducerPerformance creates static payload. This is not very useful in testing compression or when you want to test with production/custom payloads. So, we decided to add support for providing payload file as an input to producer perf test script. We made the following changes: 1. Added support to provide a payload file which can have the list of payloads that you actually want to send. 2. Moved payload generation inside the send loop for cases when payload file is provided. Following are the changes to how the producer-performance is evoked: 1. You must provide "--record-size" or "--payload-file" but not both. This is because, record size cannot be guaranteed when you are using custom events. e.g. ./kafka-producer-perf-test.sh --topic test_topic --num-records 100000 --producer-props bootstrap.servers=127.0.0.1:9092 acks=0 buffer.memory=33554432 compression.type=gzip batch.size=10240 linger.ms=10 --throughput -1 --payload-file ./test_payloads --payload-delimiter , 2. Earlier "--record-size" was a required config, now you must provide exactly one of "--record-size" or "--payload-file". Providing both will result in an error. 3. Support for an additional parameter "--payload-delimiter" has been added which defaults to "\n" Author: Sandesh K <sandesh.karkera@flipkart.com> Reviewers: dan norwood <norwood@confluent.io>, Jun Rao <junrao@gmail.com> Closes #2158 from SandeshKarkera/PerfProducerChanges
This commit is contained in:
parent
813897a006
commit
a37bf5fffa
|
@ -14,11 +14,18 @@ package org.apache.kafka.tools;
|
|||
|
||||
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
|
||||
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
|
@ -42,15 +49,36 @@ public class ProducerPerformance {
|
|||
/* parse args */
|
||||
String topicName = res.getString("topic");
|
||||
long numRecords = res.getLong("numRecords");
|
||||
int recordSize = res.getInt("recordSize");
|
||||
Integer recordSize = res.getInt("recordSize");
|
||||
int throughput = res.getInt("throughput");
|
||||
List<String> producerProps = res.getList("producerConfig");
|
||||
String producerConfig = res.getString("producerConfigFile");
|
||||
String payloadFilePath = res.getString("payloadFile");
|
||||
|
||||
// since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
|
||||
String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
|
||||
|
||||
if (producerProps == null && producerConfig == null) {
|
||||
throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser);
|
||||
}
|
||||
|
||||
List<byte[]> payloadByteList = new ArrayList<>();
|
||||
if (payloadFilePath != null) {
|
||||
Path path = Paths.get(payloadFilePath);
|
||||
System.out.println("Reading payloads from: " + path.toAbsolutePath());
|
||||
if (Files.notExists(path) || Files.size(path) == 0) {
|
||||
throw new IllegalArgumentException("File does not exist or empty file provided.");
|
||||
}
|
||||
|
||||
String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter);
|
||||
|
||||
System.out.println("Number of messages read: " + payloadList.length);
|
||||
|
||||
for (String payload : payloadList) {
|
||||
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
|
||||
Properties props = new Properties();
|
||||
if (producerConfig != null) {
|
||||
props.putAll(Utils.loadProps(producerConfig));
|
||||
|
@ -68,16 +96,24 @@ public class ProducerPerformance {
|
|||
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
|
||||
|
||||
/* setup perf test */
|
||||
byte[] payload = new byte[recordSize];
|
||||
byte[] payload = null;
|
||||
Random random = new Random(0);
|
||||
if (recordSize != null) {
|
||||
payload = new byte[recordSize];
|
||||
for (int i = 0; i < payload.length; ++i)
|
||||
payload[i] = (byte) (random.nextInt(26) + 65);
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, payload);
|
||||
}
|
||||
ProducerRecord<byte[], byte[]> record;
|
||||
Stats stats = new Stats(numRecords, 5000);
|
||||
long startMs = System.currentTimeMillis();
|
||||
|
||||
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
|
||||
for (int i = 0; i < numRecords; i++) {
|
||||
if (payloadFilePath != null) {
|
||||
payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
|
||||
}
|
||||
record = new ProducerRecord<>(topicName, payload);
|
||||
|
||||
long sendStartMs = System.currentTimeMillis();
|
||||
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
|
||||
producer.send(record, cb);
|
||||
|
@ -109,6 +145,11 @@ public class ProducerPerformance {
|
|||
.defaultHelp(true)
|
||||
.description("This tool is used to verify the producer performance.");
|
||||
|
||||
MutuallyExclusiveGroup payloadOptions = parser
|
||||
.addMutuallyExclusiveGroup()
|
||||
.required(true)
|
||||
.description("either --record-size or --payload-file must be specified but not both.");
|
||||
|
||||
parser.addArgument("--topic")
|
||||
.action(store())
|
||||
.required(true)
|
||||
|
@ -124,13 +165,34 @@ public class ProducerPerformance {
|
|||
.dest("numRecords")
|
||||
.help("number of messages to produce");
|
||||
|
||||
parser.addArgument("--record-size")
|
||||
payloadOptions.addArgument("--record-size")
|
||||
.action(store())
|
||||
.required(true)
|
||||
.required(false)
|
||||
.type(Integer.class)
|
||||
.metavar("RECORD-SIZE")
|
||||
.dest("recordSize")
|
||||
.help("message size in bytes");
|
||||
.help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.");
|
||||
|
||||
payloadOptions.addArgument("--payload-file")
|
||||
.action(store())
|
||||
.required(false)
|
||||
.type(String.class)
|
||||
.metavar("PAYLOAD-FILE")
|
||||
.dest("payloadFile")
|
||||
.help("file to read the message payloads from. This works only for UTF-8 encoded text files. " +
|
||||
"Payloads will be read from this file and a payload will be randomly selected when sending messages. " +
|
||||
"Note that you must provide exactly one of --record-size or --payload-file.");
|
||||
|
||||
parser.addArgument("--payload-delimiter")
|
||||
.action(store())
|
||||
.required(false)
|
||||
.type(String.class)
|
||||
.metavar("PAYLOAD-DELIMITER")
|
||||
.dest("payloadDelimiter")
|
||||
.setDefault("\\n")
|
||||
.help("provides delimiter to be used when --payload-file is provided. " +
|
||||
"Defaults to new line. " +
|
||||
"Note that this parameter will be ignored if --payload-file is not provided.");
|
||||
|
||||
parser.addArgument("--throughput")
|
||||
.action(store())
|
||||
|
|
Loading…
Reference in New Issue