From a37bf5fffa4a607ded14ec055276d76f19df2d50 Mon Sep 17 00:00:00 2001 From: Sandesh K Date: Thu, 19 Jan 2017 17:46:59 -0800 Subject: [PATCH] KAFKA-4432; Added support to supply custom message payloads to perf-producer script. Current implementation of ProducerPerformance creates static payload. This is not very useful in testing compression or when you want to test with production/custom payloads. So, we decided to add support for providing payload file as an input to producer perf test script. We made the following changes: 1. Added support to provide a payload file which can have the list of payloads that you actually want to send. 2. Moved payload generation inside the send loop for cases when payload file is provided. Following are the changes to how the producer-performance is evoked: 1. You must provide "--record-size" or "--payload-file" but not both. This is because, record size cannot be guaranteed when you are using custom events. e.g. ./kafka-producer-perf-test.sh --topic test_topic --num-records 100000 --producer-props bootstrap.servers=127.0.0.1:9092 acks=0 buffer.memory=33554432 compression.type=gzip batch.size=10240 linger.ms=10 --throughput -1 --payload-file ./test_payloads --payload-delimiter , 2. Earlier "--record-size" was a required config, now you must provide exactly one of "--record-size" or "--payload-file". Providing both will result in an error. 3. Support for an additional parameter "--payload-delimiter" has been added which defaults to "\n" Author: Sandesh K Reviewers: dan norwood , Jun Rao Closes #2158 from SandeshKarkera/PerfProducerChanges --- .../kafka/tools/ProducerPerformance.java | 82 ++++++++++++++++--- 1 file changed, 72 insertions(+), 10 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index a13d3ec68fc..c277b833307 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -14,11 +14,18 @@ package org.apache.kafka.tools; import static net.sourceforge.argparse4j.impl.Arguments.store; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Random; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -42,15 +49,36 @@ public class ProducerPerformance { /* parse args */ String topicName = res.getString("topic"); long numRecords = res.getLong("numRecords"); - int recordSize = res.getInt("recordSize"); + Integer recordSize = res.getInt("recordSize"); int throughput = res.getInt("throughput"); List producerProps = res.getList("producerConfig"); String producerConfig = res.getString("producerConfigFile"); + String payloadFilePath = res.getString("payloadFile"); + + // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. + String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter"); if (producerProps == null && producerConfig == null) { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } + List payloadByteList = new ArrayList<>(); + if (payloadFilePath != null) { + Path path = Paths.get(payloadFilePath); + System.out.println("Reading payloads from: " + path.toAbsolutePath()); + if (Files.notExists(path) || Files.size(path) == 0) { + throw new IllegalArgumentException("File does not exist or empty file provided."); + } + + String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter); + + System.out.println("Number of messages read: " + payloadList.length); + + for (String payload : payloadList) { + payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); + } + } + Properties props = new Properties(); if (producerConfig != null) { props.putAll(Utils.loadProps(producerConfig)); @@ -68,16 +96,24 @@ public class ProducerPerformance { KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ - byte[] payload = new byte[recordSize]; + byte[] payload = null; Random random = new Random(0); - for (int i = 0; i < payload.length; ++i) - payload[i] = (byte) (random.nextInt(26) + 65); - ProducerRecord record = new ProducerRecord<>(topicName, payload); + if (recordSize != null) { + payload = new byte[recordSize]; + for (int i = 0; i < payload.length; ++i) + payload[i] = (byte) (random.nextInt(26) + 65); + } + ProducerRecord record; Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { + if (payloadFilePath != null) { + payload = payloadByteList.get(random.nextInt(payloadByteList.size())); + } + record = new ProducerRecord<>(topicName, payload); + long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); producer.send(record, cb); @@ -109,6 +145,11 @@ public class ProducerPerformance { .defaultHelp(true) .description("This tool is used to verify the producer performance."); + MutuallyExclusiveGroup payloadOptions = parser + .addMutuallyExclusiveGroup() + .required(true) + .description("either --record-size or --payload-file must be specified but not both."); + parser.addArgument("--topic") .action(store()) .required(true) @@ -124,13 +165,34 @@ public class ProducerPerformance { .dest("numRecords") .help("number of messages to produce"); - parser.addArgument("--record-size") + payloadOptions.addArgument("--record-size") .action(store()) - .required(true) + .required(false) .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") - .help("message size in bytes"); + .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); + + payloadOptions.addArgument("--payload-file") + .action(store()) + .required(false) + .type(String.class) + .metavar("PAYLOAD-FILE") + .dest("payloadFile") + .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + + "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + + "Note that you must provide exactly one of --record-size or --payload-file."); + + parser.addArgument("--payload-delimiter") + .action(store()) + .required(false) + .type(String.class) + .metavar("PAYLOAD-DELIMITER") + .dest("payloadDelimiter") + .setDefault("\\n") + .help("provides delimiter to be used when --payload-file is provided. " + + "Defaults to new line. " + + "Note that this parameter will be ignored if --payload-file is not provided."); parser.addArgument("--throughput") .action(store())