MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)

Reivewers: Colin McCabe <cmccabe@apache.org>
This commit is contained in:
Stanislav Kozlovski 2018-11-28 17:13:21 +00:00 committed by Colin Patrick McCabe
parent c32b7e5a9f
commit ec668180d7
4 changed files with 8 additions and 8 deletions

View File

@ -91,7 +91,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final String consumerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final int maxMessages;
private final long maxMessages;
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
@ -105,7 +105,7 @@ public class ConsumeBenchSpec extends TaskSpec {
@JsonProperty("consumerNode") String consumerNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("maxMessages") long maxMessages,
@JsonProperty("consumerGroup") String consumerGroup,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@ -146,7 +146,7 @@ public class ConsumeBenchSpec extends TaskSpec {
}
@JsonProperty
public int maxMessages() {
public long maxMessages() {
return maxMessages;
}

View File

@ -233,7 +233,7 @@ public class ConsumeBenchWorker implements TaskWorker {
long bytesConsumed = 0;
long startTimeMs = Time.SYSTEM.milliseconds();
long startBatchMs = startTimeMs;
int maxMessages = spec.maxMessages();
long maxMessages = spec.maxMessages();
try {
while (messagesConsumed < maxMessages) {
ConsumerRecords<byte[], byte[]> records = consumer.poll();

View File

@ -64,7 +64,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final String producerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final int maxMessages;
private final long maxMessages;
private final PayloadGenerator keyGenerator;
private final PayloadGenerator valueGenerator;
private final Optional<TransactionGenerator> transactionGenerator;
@ -80,7 +80,7 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("producerNode") String producerNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("maxMessages") long maxMessages,
@JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("transactionGenerator") Optional<TransactionGenerator> txGenerator,
@ -124,7 +124,7 @@ public class ProduceBenchSpec extends TaskSpec {
}
@JsonProperty
public int maxMessages() {
public long maxMessages() {
return maxMessages;
}

View File

@ -225,7 +225,7 @@ public class ProduceBenchWorker implements TaskWorker {
if (enableTransactions)
producer.initTransactions();
int sentMessages = 0;
long sentMessages = 0;
while (sentMessages < spec.maxMessages()) {
if (enableTransactions) {
boolean tookAction = takeTransactionAction();