KAFKA-6771. Make specifying partitions more flexible (#4850)

This commit is contained in:
Colin Patrick McCabe 2018-04-16 00:55:13 -07:00 committed by Rajini Sivaram
parent 832b096f4f
commit 93e03414f7
12 changed files with 564 additions and 148 deletions

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Utilities for expanding strings that have range expressions in them.
*
* For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3.
* Strings that have no range expressions will not be expanded.
*/
public class StringExpander {
private final static Pattern NUMERIC_RANGE_PATTERN =
Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)");
public static HashSet<String> expand(String val) {
HashSet<String> set = new HashSet<>();
Matcher matcher = NUMERIC_RANGE_PATTERN.matcher(val);
if (!matcher.matches()) {
set.add(val);
return set;
}
String prequel = matcher.group(1);
String rangeStart = matcher.group(2);
String rangeEnd = matcher.group(3);
String epilog = matcher.group(4);
int rangeStartInt = Integer.parseInt(rangeStart);
int rangeEndInt = Integer.parseInt(rangeEnd);
if (rangeEndInt < rangeStartInt) {
throw new RuntimeException("Invalid range: start " + rangeStartInt +
" is higher than end " + rangeEndInt);
}
for (int i = rangeStartInt; i <= rangeEndInt; i++) {
set.add(String.format("%s%d%s", prequel, i, epilog));
}
return set;
}
}

View File

@ -41,10 +41,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
private final String topicRegex;
private final int startPartition;
private final int endPartition;
private final TopicsSpec activeTopics;
@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@ -56,9 +53,7 @@ public class ConsumeBenchSpec extends TaskSpec {
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("topicRegex") String topicRegex,
@JsonProperty("startPartition") int startPartition,
@JsonProperty("endPartition") int endPartition) {
@JsonProperty("activeTopics") TopicsSpec activeTopics) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
@ -67,9 +62,7 @@ public class ConsumeBenchSpec extends TaskSpec {
this.consumerConf = configOrEmptyMap(consumerConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.topicRegex = topicRegex;
this.startPartition = startPartition;
this.endPartition = endPartition;
this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : activeTopics.immutableCopy();
}
@JsonProperty
@ -108,18 +101,8 @@ public class ConsumeBenchSpec extends TaskSpec {
}
@JsonProperty
public String topicRegex() {
return topicRegex;
}
@JsonProperty
public int startPartition() {
return startPartition;
}
@JsonProperty
public int endPartition() {
return endPartition;
public TopicsSpec activeTopics() {
return activeTopics;
}
@Override

View File

@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
@ -40,6 +39,8 @@ import org.slf4j.LoggerFactory;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@ -84,19 +85,14 @@ public class ConsumeBenchWorker implements TaskWorker {
@Override
public void run() {
try {
// find topics to consume from based on provided topic regular expression
if (spec.topicRegex() == null) {
throw new ConfigException(
"Must provide topic name or regular expression to match existing topics.");
HashSet<TopicPartition> partitions = new HashSet<>();
for (Map.Entry<String, PartitionsSpec> entry : spec.activeTopics().materialize().entrySet()) {
for (Integer partitionNumber : entry.getValue().partitionNumbers()) {
partitions.add(new TopicPartition(entry.getKey(), partitionNumber));
}
}
Collection<TopicPartition> topicPartitions =
WorkerUtils.getMatchingTopicPartitions(
log, spec.bootstrapServers(),
spec.commonClientConf(), spec.adminClientConf(),
spec.topicRegex(), spec.startPartition(), spec.endPartition());
log.info("Will consume from {}", topicPartitions);
executor.submit(new ConsumeMessages(topicPartitions));
log.info("Will consume from {}", partitions);
executor.submit(new ConsumeMessages(partitions));
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.trogdor.rest.Message;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Describes some partitions.
*/
public class PartitionsSpec extends Message {
private final static short DEFAULT_REPLICATION_FACTOR = 3;
private final static short DEFAULT_NUM_PARTITIONS = 1;
private final int numPartitions;
private final short replicationFactor;
private final Map<Integer, List<Integer>> partitionAssignments;
@JsonCreator
public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions,
@JsonProperty("replicationFactor") short replicationFactor,
@JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments) {
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.partitionAssignments = partitionAssignments == null ?
new HashMap<Integer, List<Integer>>() : partitionAssignments;
}
@JsonProperty
public int numPartitions() {
return numPartitions;
}
public List<Integer> partitionNumbers() {
if (partitionAssignments.isEmpty()) {
ArrayList<Integer> partitionNumbers = new ArrayList<>();
int effectiveNumPartitions = numPartitions <= 0 ? DEFAULT_NUM_PARTITIONS : numPartitions;
for (int i = 0; i < effectiveNumPartitions; i++) {
partitionNumbers.add(i);
}
return partitionNumbers;
} else {
return new ArrayList<>(partitionAssignments.keySet());
}
}
@JsonProperty
public short replicationFactor() {
return replicationFactor;
}
@JsonProperty
public Map<Integer, List<Integer>> partitionAssignmentsap() {
return partitionAssignments;
}
public NewTopic newTopic(String topicName) {
if (partitionAssignments.isEmpty()) {
int effectiveNumPartitions = numPartitions <= 0 ?
DEFAULT_NUM_PARTITIONS : numPartitions;
short effectiveReplicationFactor = replicationFactor <= 0 ?
DEFAULT_REPLICATION_FACTOR : replicationFactor;
return new NewTopic(topicName, effectiveNumPartitions, effectiveReplicationFactor);
} else {
return new NewTopic(topicName, partitionAssignments);
}
}
}

View File

@ -32,11 +32,6 @@ import java.util.Set;
* The specification for a benchmark that produces messages to a set of topics.
*/
public class ProduceBenchSpec extends TaskSpec {
private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic";
private static final int DEFAULT_NUM_PARTITIONS = 1;
private static final short DEFAULT_REPLICATION_FACTOR = 3;
private final String producerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
@ -46,11 +41,8 @@ public class ProduceBenchSpec extends TaskSpec {
private final Map<String, String> producerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
private final int totalTopics;
private final int activeTopics;
private final String topicPrefix;
private final int numPartitions;
private final short replicationFactor;
private final TopicsSpec activeTopics;
private final TopicsSpec inactiveTopics;
@JsonCreator
public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@ -64,11 +56,8 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("totalTopics") int totalTopics,
@JsonProperty("activeTopics") int activeTopics,
@JsonProperty("topicPrefix") String topicPrefix,
@JsonProperty("partitionsPerTopic") int partitionsPerTopic,
@JsonProperty("replicationFactor") short replicationFactor) {
@JsonProperty("activeTopics") TopicsSpec activeTopics,
@JsonProperty("inactiveTopics") TopicsSpec inactiveTopics) {
super(startMs, durationMs);
this.producerNode = (producerNode == null) ? "" : producerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
@ -81,13 +70,10 @@ public class ProduceBenchSpec extends TaskSpec {
this.producerConf = configOrEmptyMap(producerConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.totalTopics = totalTopics;
this.activeTopics = activeTopics;
this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix;
this.numPartitions = (partitionsPerTopic == 0)
? DEFAULT_NUM_PARTITIONS : partitionsPerTopic;
this.replicationFactor = (replicationFactor == 0)
? DEFAULT_REPLICATION_FACTOR : replicationFactor;
this.activeTopics = (activeTopics == null) ?
TopicsSpec.EMPTY : activeTopics.immutableCopy();
this.inactiveTopics = (inactiveTopics == null) ?
TopicsSpec.EMPTY : inactiveTopics.immutableCopy();
}
@JsonProperty
@ -136,28 +122,13 @@ public class ProduceBenchSpec extends TaskSpec {
}
@JsonProperty
public int totalTopics() {
return totalTopics;
}
@JsonProperty
public int activeTopics() {
public TopicsSpec activeTopics() {
return activeTopics;
}
@JsonProperty
public String topicPrefix() {
return topicPrefix;
}
@JsonProperty
public int numPartitions() {
return numPartitions;
}
@JsonProperty
public short replicationFactor() {
return replicationFactor;
public TopicsSpec inactiveTopics() {
return inactiveTopics;
}
@Override

View File

@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
@ -40,6 +40,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@ -66,16 +68,6 @@ public class ProduceBenchWorker implements TaskWorker {
private KafkaFutureImpl<String> doneFuture;
/**
* Generate a topic name based on a topic number.
*
* @param topicIndex The topic number.
* @return The topic name.
*/
public String topicIndexToName(int topicIndex) {
return String.format("%s%05d", spec.topicPrefix(), topicIndex);
}
public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
this.id = id;
this.spec = spec;
@ -88,7 +80,9 @@ public class ProduceBenchWorker implements TaskWorker {
throw new IllegalStateException("ProducerBenchWorker is already running.");
}
log.info("{}: Activating ProduceBenchWorker with {}", id, spec);
this.executor = Executors.newScheduledThreadPool(1,
// Create an executor with 2 threads. We need the second thread so
// that the StatusUpdater can run in parallel with SendRecords.
this.executor = Executors.newScheduledThreadPool(2,
ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
this.status = status;
this.doneFuture = doneFuture;
@ -99,25 +93,31 @@ public class ProduceBenchWorker implements TaskWorker {
@Override
public void run() {
try {
if (spec.activeTopics() == 0) {
throw new ConfigException("Can't have activeTopics == 0.");
}
if (spec.totalTopics() < spec.activeTopics()) {
throw new ConfigException(String.format(
"activeTopics was %d, but totalTopics was only %d. activeTopics must " +
"be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
}
Map<String, NewTopic> newTopics = new HashMap<>();
for (int i = 0; i < spec.totalTopics(); i++) {
String name = topicIndexToName(i);
newTopics.put(name, new NewTopic(name, spec.numPartitions(),
spec.replicationFactor()));
HashSet<TopicPartition> active = new HashSet<>();
for (Map.Entry<String, PartitionsSpec> entry :
spec.activeTopics().materialize().entrySet()) {
String topicName = entry.getKey();
PartitionsSpec partSpec = entry.getValue();
newTopics.put(topicName, partSpec.newTopic(topicName));
for (Integer partitionNumber : partSpec.partitionNumbers()) {
active.add(new TopicPartition(topicName, partitionNumber));
}
}
status.update(new TextNode("Creating " + spec.totalTopics() + " topic(s)"));
if (active.isEmpty()) {
throw new RuntimeException("You must specify at least one active topic.");
}
for (Map.Entry<String, PartitionsSpec> entry :
spec.inactiveTopics().materialize().entrySet()) {
String topicName = entry.getKey();
PartitionsSpec partSpec = entry.getValue();
newTopics.put(topicName, partSpec.newTopic(topicName));
}
status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
spec.adminClientConf(), newTopics, false);
status.update(new TextNode("Created " + spec.totalTopics() + " topic(s)"));
executor.submit(new SendRecords());
status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
executor.submit(new SendRecords(active));
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}
@ -167,6 +167,8 @@ public class ProduceBenchWorker implements TaskWorker {
}
public class SendRecords implements Callable<Void> {
private final HashSet<TopicPartition> activePartitions;
private final Histogram histogram;
private final Future<?> statusUpdaterFuture;
@ -179,7 +181,8 @@ public class ProduceBenchWorker implements TaskWorker {
private final Throttle throttle;
SendRecords() {
SendRecords(HashSet<TopicPartition> activePartitions) {
this.activePartitions = activePartitions;
this.histogram = new Histogram(5000);
int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
@ -201,13 +204,16 @@ public class ProduceBenchWorker implements TaskWorker {
try {
Future<RecordMetadata> future = null;
try {
Iterator<TopicPartition> iter = activePartitions.iterator();
for (int m = 0; m < spec.maxMessages(); m++) {
for (int i = 0; i < spec.activeTopics(); i++) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
topicIndexToName(i), 0, keys.next(), values.next());
future = producer.send(record,
new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
if (!iter.hasNext()) {
iter = activePartitions.iterator();
}
TopicPartition partition = iter.next();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
partition.topic(), partition.partition(), keys.next(), values.next());
future = producer.send(record,
new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
throttle.increment();
}
} finally {

View File

@ -17,6 +17,9 @@
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -27,6 +30,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
@ -35,6 +39,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
@ -46,33 +51,31 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
private static final int MESSAGE_SIZE = 512;
private static final int LOG_INTERVAL_MS = 5000;
private static final int LOG_NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "round_trip_topic";
private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class);
private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0);
private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker();
private ToReceiveTracker toReceiveTracker;
private final String id;
@ -80,18 +83,20 @@ public class RoundTripWorker implements TaskWorker {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService executor;
private ScheduledExecutorService executor;
private WorkerStatusTracker status;
private KafkaFutureImpl<String> doneFuture;
private KafkaProducer<byte[], byte[]> producer;
private PayloadGenerator payloadGenerator;
private KafkaConsumer<byte[], byte[]> consumer;
private CountDownLatch unackedSends;
private ToSendTracker toSendTracker;
public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
this.id = id;
this.spec = spec;
@ -104,8 +109,9 @@ public class RoundTripWorker implements TaskWorker {
throw new IllegalStateException("RoundTripWorker is already running.");
}
log.info("{}: Activating RoundTripWorker.", id);
this.executor = Executors.newCachedThreadPool(
this.executor = Executors.newScheduledThreadPool(3,
ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
this.status = status;
this.doneFuture = doneFuture;
this.producer = null;
this.consumer = null;
@ -120,16 +126,31 @@ public class RoundTripWorker implements TaskWorker {
if (spec.targetMessagesPerSec() <= 0) {
throw new ConfigException("Can't have targetMessagesPerSec <= 0.");
}
if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) {
throw new ConfigException("Invalid null or empty partitionAssignments.");
Map<String, NewTopic> newTopics = new HashMap<>();
HashSet<TopicPartition> active = new HashSet<>();
for (Map.Entry<String, PartitionsSpec> entry :
spec.activeTopics().materialize().entrySet()) {
String topicName = entry.getKey();
PartitionsSpec partSpec = entry.getValue();
newTopics.put(topicName, partSpec.newTopic(topicName));
for (Integer partitionNumber : partSpec.partitionNumbers()) {
active.add(new TopicPartition(topicName, partitionNumber));
}
}
WorkerUtils.createTopics(
log, spec.bootstrapServers(), spec.commonClientConf(), spec.adminClientConf(),
Collections.singletonMap(TOPIC_NAME,
new NewTopic(TOPIC_NAME, spec.partitionAssignments())),
true);
executor.submit(new ProducerRunnable());
executor.submit(new ConsumerRunnable());
if (active.isEmpty()) {
throw new RuntimeException("You must specify at least one active topic.");
}
status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
spec.adminClientConf(), newTopics, true);
status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
toSendTracker = new ToSendTracker(spec.maxMessages());
toReceiveTracker = new ToReceiveTracker();
executor.submit(new ProducerRunnable(active));
executor.submit(new ConsumerRunnable(active));
executor.submit(new StatusUpdater());
executor.scheduleWithFixedDelay(
new StatusUpdater(), 30, 30, TimeUnit.SECONDS);
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}
@ -159,6 +180,10 @@ public class RoundTripWorker implements TaskWorker {
failed.add(index);
}
synchronized int frontier() {
return frontier;
}
synchronized ToSendTrackerResult next() {
if (failed.isEmpty()) {
if (frontier >= maxMessages) {
@ -173,9 +198,11 @@ public class RoundTripWorker implements TaskWorker {
}
class ProducerRunnable implements Runnable {
private final HashSet<TopicPartition> partitions;
private final Throttle throttle;
ProducerRunnable() {
ProducerRunnable(HashSet<TopicPartition> partitions) {
this.partitions = partitions;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
@ -195,11 +222,11 @@ public class RoundTripWorker implements TaskWorker {
@Override
public void run() {
final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages());
long messagesSent = 0;
long uniqueMessagesSent = 0;
log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id);
try {
Iterator<TopicPartition> iter = partitions.iterator();
while (true) {
final ToSendTrackerResult result = toSendTracker.next();
if (result == null) {
@ -212,9 +239,13 @@ public class RoundTripWorker implements TaskWorker {
uniqueMessagesSent++;
}
messagesSent++;
if (!iter.hasNext()) {
iter = partitions.iterator();
}
TopicPartition partition = iter.next();
// we explicitly specify generator position based on message index
ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC_NAME, 0,
KEY_GENERATOR.generate(messageIndex),
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(partition.topic(),
partition.partition(), KEY_GENERATOR.generate(messageIndex),
spec.valueGenerator().generate(messageIndex));
producer.send(record, new Callback() {
@Override
@ -242,12 +273,23 @@ public class RoundTripWorker implements TaskWorker {
private class ToReceiveTracker {
private final TreeSet<Integer> pending = new TreeSet<>();
private int totalReceived = 0;
synchronized void addPending(int messageIndex) {
pending.add(messageIndex);
}
synchronized boolean removePending(int messageIndex) {
return pending.remove(messageIndex);
if (pending.remove(messageIndex)) {
totalReceived++;
return true;
} else {
return false;
}
}
synchronized int totalReceived() {
return totalReceived;
}
void log() {
@ -269,7 +311,7 @@ public class RoundTripWorker implements TaskWorker {
class ConsumerRunnable implements Runnable {
private final Properties props;
ConsumerRunnable() {
ConsumerRunnable(HashSet<TopicPartition> partitions) {
this.props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
@ -281,7 +323,7 @@ public class RoundTripWorker implements TaskWorker {
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
consumer.subscribe(Collections.singleton(TOPIC_NAME));
consumer.assign(partitions);
}
@Override
@ -296,7 +338,8 @@ public class RoundTripWorker implements TaskWorker {
try {
pollInvoked++;
ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME)) {
for (Iterator<ConsumerRecord<byte[], byte[]>> iter = records.iterator(); iter.hasNext(); ) {
ConsumerRecord<byte[], byte[]> record = iter.next();
int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
messagesReceived++;
if (toReceiveTracker.removePending(messageIndex)) {
@ -306,6 +349,7 @@ public class RoundTripWorker implements TaskWorker {
"Waiting for all sends to be acked...", id, spec.maxMessages());
unackedSends.await();
log.info("{}: all sends have been acked.", id);
new StatusUpdater().update();
doneFuture.complete("");
return;
}
@ -332,6 +376,46 @@ public class RoundTripWorker implements TaskWorker {
}
}
public class StatusUpdater implements Runnable {
@Override
public void run() {
try {
update();
} catch (Exception e) {
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
}
}
StatusData update() {
StatusData statusData =
new StatusData(toSendTracker.frontier(), toReceiveTracker.totalReceived());
status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
return statusData;
}
}
public static class StatusData {
private final long totalUniqueSent;
private final long totalReceived;
@JsonCreator
public StatusData(@JsonProperty("totalUniqueSent") long totalUniqueSent,
@JsonProperty("totalReceived") long totalReceived) {
this.totalUniqueSent = totalUniqueSent;
this.totalReceived = totalReceived;
}
@JsonProperty
public long totalUniqueSent() {
return totalUniqueSent;
}
@JsonProperty
public long totalReceived() {
return totalReceived;
}
}
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {

View File

@ -25,11 +25,8 @@ import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
/**
* The specification for a workload that sends messages to a broker and then
@ -39,8 +36,8 @@ public class RoundTripWorkloadSpec extends TaskSpec {
private final String clientNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final NavigableMap<Integer, List<Integer>> partitionAssignments;
private final PayloadGenerator valueGenerator;
private final TopicsSpec activeTopics;
private final int maxMessages;
private final Map<String, String> commonClientConf;
private final Map<String, String> producerConf;
@ -57,17 +54,17 @@ public class RoundTripWorkloadSpec extends TaskSpec {
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>> partitionAssignments,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("activeTopics") TopicsSpec activeTopics,
@JsonProperty("maxMessages") int maxMessages) {
super(startMs, durationMs);
this.clientNode = clientNode == null ? "" : clientNode;
this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers;
this.targetMessagesPerSec = targetMessagesPerSec;
this.partitionAssignments = partitionAssignments == null ?
new TreeMap<Integer, List<Integer>>() : partitionAssignments;
this.valueGenerator = valueGenerator == null ?
new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator;
this.activeTopics = activeTopics == null ?
TopicsSpec.EMPTY : activeTopics.immutableCopy();
this.maxMessages = maxMessages;
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
@ -91,8 +88,8 @@ public class RoundTripWorkloadSpec extends TaskSpec {
}
@JsonProperty
public NavigableMap<Integer, List<Integer>> partitionAssignments() {
return partitionAssignments;
public TopicsSpec activeTopics() {
return activeTopics;
}
@JsonProperty

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.kafka.trogdor.common.StringExpander;
import org.apache.kafka.trogdor.rest.Message;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* TopicsSpec maps topic names to descriptions of the partitions in them.
*
* In JSON form, this is serialized as a map whose keys are topic names,
* and whose entries are partition descriptions.
* Keys may also refer to multiple partitions. For example, this specification
* refers to 3 topics foo1, foo2, and foo3:
*
* {
* "foo[1-3]" : {
* "numPartitions": 3
* "replicationFactor": 3
* }
* }
*/
public class TopicsSpec extends Message {
public static final TopicsSpec EMPTY = new TopicsSpec().immutableCopy();
private final Map<String, PartitionsSpec> map;
@JsonCreator
public TopicsSpec() {
this.map = new HashMap<>();
}
private TopicsSpec(Map<String, PartitionsSpec> map) {
this.map = map;
}
@JsonAnyGetter
public Map<String, PartitionsSpec> get() {
return map;
}
@JsonAnySetter
public void set(String name, PartitionsSpec value) {
map.put(name, value);
}
public TopicsSpec immutableCopy() {
HashMap<String, PartitionsSpec> mapCopy = new HashMap<>();
mapCopy.putAll(map);
return new TopicsSpec(Collections.unmodifiableMap(mapCopy));
}
/**
* Enumerate the partitions inside this TopicsSpec.
*
* @return A map from topic names to PartitionsSpec objects.
*/
public Map<String, PartitionsSpec> materialize() {
HashMap<String, PartitionsSpec> all = new HashMap<>();
for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) {
for (String topicName : StringExpander.expand(entry.getKey())) {
all.put(topicName, entry.getValue());
}
}
return all;
}
}

View File

@ -26,9 +26,10 @@ import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.apache.kafka.trogdor.workload.PartitionsSpec;
import org.apache.kafka.trogdor.workload.ProduceBenchSpec;
import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
import org.apache.kafka.trogdor.workload.TopicsSpec;
import org.junit.Test;
import java.lang.reflect.Field;
@ -49,10 +50,11 @@ public class JsonSerializationTest {
verify(new WorkerRunning(null, null, 0, null));
verify(new WorkerStopping(null, null, 0, null));
verify(new ProduceBenchSpec(0, 0, null, null,
0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
0, 0, null, null, null, null, null, null, null));
verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
0, null, null, 0));
verify(new SampleTaskSpec(0, 0, null, null));
verify(new TopicsSpec());
verify(new PartitionsSpec(0, (short) 0, null));
}
private <T> void verify(T val1) throws Exception {

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import static org.junit.Assert.assertEquals;
public class StringExpanderTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testNoExpansionNeeded() throws Exception {
assertEquals(Collections.singleton("foo"), StringExpander.expand("foo"));
assertEquals(Collections.singleton("bar"), StringExpander.expand("bar"));
assertEquals(Collections.singleton(""), StringExpander.expand(""));
}
@Test
public void testExpansions() throws Exception {
HashSet<String> expected1 = new HashSet<>(Arrays.asList(
"foo1",
"foo2",
"foo3"
));
assertEquals(expected1, StringExpander.expand("foo[1-3]"));
HashSet<String> expected2 = new HashSet<>(Arrays.asList(
"foo bar baz 0"
));
assertEquals(expected2, StringExpander.expand("foo bar baz [0-0]"));
HashSet<String> expected3 = new HashSet<>(Arrays.asList(
"[[ wow50 ]]",
"[[ wow51 ]]",
"[[ wow52 ]]"
));
assertEquals(expected3, StringExpander.expand("[[ wow[50-52] ]]"));
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TopicsSpecTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
private final static TopicsSpec FOO;
private final static PartitionsSpec PARTSA;
private final static PartitionsSpec PARTSB;
static {
FOO = new TopicsSpec();
PARTSA = new PartitionsSpec(3, (short) 3, null);
FOO.set("topicA[0-2]", PARTSA);
Map<Integer, List<Integer>> assignmentsB = new HashMap<>();
assignmentsB.put(0, Arrays.asList(0, 1, 2));
assignmentsB.put(1, Arrays.asList(2, 3, 4));
PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB);
FOO.set("topicB", PARTSB);
}
@Test
public void testMaterialize() {
Map<String, PartitionsSpec> parts = FOO.materialize();
assertTrue(parts.containsKey("topicA0"));
assertTrue(parts.containsKey("topicA1"));
assertTrue(parts.containsKey("topicA2"));
assertTrue(parts.containsKey("topicB"));
assertEquals(4, parts.keySet().size());
assertEquals(PARTSA, parts.get("topicA0"));
assertEquals(PARTSA, parts.get("topicA1"));
assertEquals(PARTSA, parts.get("topicA2"));
assertEquals(PARTSB, parts.get("topicB"));
}
@Test
public void testPartitionNumbers() {
List<Integer> partsANumbers = PARTSA.partitionNumbers();
assertEquals(Integer.valueOf(0), partsANumbers.get(0));
assertEquals(Integer.valueOf(1), partsANumbers.get(1));
assertEquals(Integer.valueOf(2), partsANumbers.get(2));
assertEquals(3, partsANumbers.size());
List<Integer> partsBNumbers = PARTSB.partitionNumbers();
assertEquals(Integer.valueOf(0), partsBNumbers.get(0));
assertEquals(Integer.valueOf(1), partsBNumbers.get(1));
assertEquals(2, partsBNumbers.size());
}
}