KAFKA-5491; Enable transactions in ProducerPerformance Tool

With this patch, the `ProducePerfomance` tool can create transactions of differing durations.

This patch was used to to collect the initial set of benchmarks for transaction performance, documented here: https://docs.google.com/spreadsheets/d/1dHY6M7qCiX-NFvsgvaE0YoVdNq26uA8608XIh_DUpI4/edit#gid=282787170

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3400 from apurvam/MINOR-add-transaction-size-to-producre-perf

(cherry picked from commit bc47e9d6ca)
Signed-off-by: Jun Rao <junrao@gmail.com>
This commit is contained in:
Apurva Mehta 2017-06-21 14:41:51 -07:00 committed by Jun Rao
parent 1d65f15f2b
commit 701297f1d6
1 changed files with 47 additions and 1 deletions

View File

@ -59,7 +59,10 @@ public class ProducerPerformance {
List<String> producerProps = res.getList("producerConfig");
String producerConfig = res.getString("producerConfigFile");
String payloadFilePath = res.getString("payloadFile");
String transactionalId = res.getString("transactionalId");
boolean shouldPrintMetrics = res.getBoolean("printMetrics");
long transactionDurationMs = res.getLong("transactionDurationMs");
boolean transactionsEnabled = 0 < transactionDurationMs;
// since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
@ -99,7 +102,13 @@ public class ProducerPerformance {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
if (transactionsEnabled)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
if (transactionsEnabled)
producer.initTransactions();
/* setup perf test */
byte[] payload = null;
@ -114,7 +123,16 @@ public class ProducerPerformance {
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
int currentTransactionSize = 0;
long transactionStartTime = 0;
for (int i = 0; i < numRecords; i++) {
if (transactionsEnabled && currentTransactionSize == 0) {
producer.beginTransaction();
transactionStartTime = System.currentTimeMillis();
}
if (payloadFilePath != null) {
payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
}
@ -124,11 +142,20 @@ public class ProducerPerformance {
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
currentTransactionSize++;
if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {
producer.commitTransaction();
currentTransactionSize = 0;
}
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
if (transactionsEnabled && currentTransactionSize != 0)
producer.commitTransaction();
if (!shouldPrintMetrics) {
producer.close();
@ -246,6 +273,25 @@ public class ProducerPerformance {
.dest("printMetrics")
.help("print out metrics at the end of the test.");
parser.addArgument("--transactional-id")
.action(store())
.required(false)
.type(String.class)
.metavar("TRANSACTIONAL-ID")
.dest("transactionalId")
.setDefault("performance-producer-default-transactional-id")
.help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");
parser.addArgument("--transaction-duration-ms")
.action(store())
.required(false)
.type(Long.class)
.metavar("TRANSACTION-DURATION")
.dest("transactionDurationMs")
.setDefault(0L)
.help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive.");
return parser;
}