KAFKA-7514: Add threads to ConsumeBenchWorker (#5864)

Add threads with separate consumers to ConsumeBenchWorker.  Update the Trogdor test scripts and documentation with the new functionality.

Reviewers: Colin McCabe <cmccabe@apache.org>
This commit is contained in:
Stanislav Kozlovski 2018-11-13 16:38:42 +00:00 committed by Colin Patrick McCabe
parent d00938fdf8
commit 8259fda695
8 changed files with 380 additions and 92 deletions

View File

@ -38,16 +38,14 @@ Let's confirm that all of the daemons are running:
Now, we can submit a test job to Trogdor. Here's an example of a short bash script which makes it easier.
> ./tests/bin/trogdor-run-produce-bench.sh
[2018-04-12 10:32:04,055] DEBUG Sending POST with input {"id":"produce_bench_22137","spec":{"class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec","startMs":0,"durationMs":10000000,"producerNode":"node0","bootstrapServers":"localhost:9092","targetMessagesPerSec":10,"maxMessages":100,"keyGenerator":{"type":"sequential","size":4,"startOffset":0},"valueGenerator":{"type":"constant","size":512,"value":"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="},"totalTopics":10,"activeTopics":5,"topicPrefix":"foo","replicationFactor":1,"classLoader":{},"numPartitions":1}} to http://localhost:8889/coordinator/task/create (org.apache.kafka.trogdor.coordinator.CoordinatorClient)
Created task.
$TASK_ID = produce_bench_20462
Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = produce_bench_21634
To get the test results, we run --show-tasks:
./bin/trogdor.sh client --show-tasks localhost:8889
Got coordinator tasks: {
"tasks" : {
"produce_bench_20462" : {
"produce_bench_21634" : {
"state" : "DONE",
"spec" : {
"class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
@ -55,8 +53,8 @@ To get the test results, we run --show-tasks:
"durationMs" : 10000000,
"producerNode" : "node0",
"bootstrapServers" : "localhost:9092",
"targetMessagesPerSec" : 10,
"maxMessages" : 100,
"targetMessagesPerSec" : 10000,
"maxMessages" : 50000,
"keyGenerator" : {
"type" : "sequential",
"size" : 4,
@ -67,22 +65,28 @@ To get the test results, we run --show-tasks:
"size" : 512,
"value" : "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
},
"totalTopics" : 10,
"activeTopics" : 5,
"topicPrefix" : "foo",
"replicationFactor" : 1,
"classLoader" : { },
"numPartitions" : 1
"activeTopics" : {
"foo[1-3]" : {
"numPartitions" : 10,
"replicationFactor" : 1
}
},
"inactiveTopics" : {
"foo[4-5]" : {
"numPartitions" : 10,
"replicationFactor" : 1
}
}
},
"startedMs" : 1523552769850,
"doneMs" : 1523552780878,
"startedMs" : 1541435949784,
"doneMs" : 1541435955803,
"cancelled" : false,
"status" : {
"totalSent" : 500,
"averageLatencyMs" : 4.972,
"p50LatencyMs" : 4,
"p95LatencyMs" : 6,
"p99LatencyMs" : 12
"totalSent" : 50000,
"averageLatencyMs" : 11.0293,
"p50LatencyMs" : 9,
"p95LatencyMs" : 27,
"p99LatencyMs" : 39
}
}
}
@ -141,7 +145,7 @@ ProduceBench starts a Kafka producer on a single agent node, producing to severa
RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer.
### ConsumeBench
ConsumeBench starts a Kafka consumer on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumer either subscribes to a set of topics (leveraging consumer group functionality) or manually assigns partitions to itself.
ConsumeBench starts one or more Kafka consumers on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumers either subscribe to a set of topics (leveraging consumer group functionality and dynamic partition assignment) or manually assign partitions to themselves.
The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency.
Faults

View File

@ -25,7 +25,10 @@ cat <<EOF
"durationMs": 10000000,
"consumerNode": "node0",
"bootstrapServers": "localhost:9092",
"maxMessages": 100,
"targetMessagesPerSec": 1000,
"threadsPerWorker": 5,
"consumerGroup": "cg",
"maxMessages": 10000,
"activeTopics": ["foo[1-3]"]
}
}

View File

@ -25,17 +25,17 @@ cat <<EOF
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 10,
"maxMessages": 100,
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
"activeTopics": {
"foo[1-3]": {
"numPartitions": 3,
"numPartitions": 10,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 3,
"numPartitions": 10,
"replicationFactor": 1
}
}

View File

@ -21,7 +21,7 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class ConsumeBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
target_messages_per_sec, max_messages, active_topics,
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None):
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None, threads_per_worker=1):
super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ConsumeBenchSpec"
self.message["consumerNode"] = consumer_node
@ -32,6 +32,7 @@ class ConsumeBenchWorkloadSpec(TaskSpec):
self.message["adminClientConf"] = admin_client_conf
self.message["commonClientConf"] = common_client_conf
self.message["activeTopics"] = active_topics
self.message["threadsPerWorker"] = threads_per_worker
if consumer_group is not None:
self.message["consumerGroup"] = consumer_group

View File

@ -86,7 +86,7 @@ class ConsumeBenchTest(Test):
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_consume_bench_single_partition(self):
def test_single_partition(self):
"""
Run a ConsumeBench against a single partition
"""
@ -107,9 +107,32 @@ class ConsumeBenchTest(Test):
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_consume_group_bench(self):
def test_multiple_consumers_random_group_topics(self):
"""
Runs two ConsumeBench workloads in the same consumer group to read messages from topics
Runs multiple consumers group to read messages from topics.
Since a consumerGroup isn't specified, each consumer should read from all topics independently
"""
self.produce_messages(self.active_topics, max_messages=5000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=5000, # all should read exactly 5k messages
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=5,
active_topics=["consume_bench_topic[0-5]"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_two_consumers_specified_group_topics(self):
"""
Runs two consumers in the same consumer group to read messages from topics.
Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group
"""
self.produce_messages(self.active_topics)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
@ -120,13 +143,62 @@ class ConsumeBenchTest(Test):
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=2,
consumer_group="testGroup",
active_topics=["consume_bench_topic[0-5]"])
consume_workload_1 = self.trogdor.create_task("consume_workload_1", consume_spec)
consume_workload_2 = self.trogdor.create_task("consume_workload_2", consume_spec)
consume_workload_1.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload 1 finished")
consume_workload_2.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload 2 finished")
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_multiple_consumers_random_group_partitions(self):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned a random group
and consume from all partitions
"""
self.produce_messages(self.active_topics, max_messages=20000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=2000,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=4,
active_topics=["consume_bench_topic1:[0-4]"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_multiple_consumers_specified_group_partitions_should_raise(self):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned a random group
and consume from all partitions
"""
self.produce_messages(self.active_topics, max_messages=20000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=2000,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=4,
consumer_group="fail_group",
active_topics=["consume_bench_topic1:[0-4]"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
try:
consume_workload.wait_for_done(timeout_sec=360)
raise Exception("Should have raised an exception due to an invalid configuration")
except RuntimeError as e:
if 'Will not split partitions' not in str(e):
raise RuntimeError("Unexpected Exception - " + str(e))
self.logger.info(e)

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.trogdor.common.StringExpander;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
@ -61,6 +62,15 @@ import java.util.HashSet;
* #{@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}.
* It will be assigned partitions dynamically from the consumer group.
*
* This specification supports the spawning of multiple consumers in the single Trogdor worker agent.
* The "threadsPerWorker" field denotes how many consumers should be spawned for this spec.
* It is worth noting that the "targetMessagesPerSec", "maxMessages" and "activeTopics" fields apply for every consumer individually.
*
* If a consumer group is not specified, every consumer is assigned a different, random group. When specified, all consumers use the same group.
* Since no two consumers in the same group can be assigned the same partition,
* explicitly specifying partitions in "activeTopics" when there are multiple "threadsPerWorker"
* and a particular "consumerGroup" will result in an #{@link ConfigException}, aborting the task.
*
* An example JSON representation which will result in a consumer that is part of the consumer group "cg" and
* subscribed to topics foo1, foo2, foo3 and bar.
* #{@code
@ -77,7 +87,6 @@ import java.util.HashSet;
*/
public class ConsumeBenchSpec extends TaskSpec {
static final String EMPTY_CONSUMER_GROUP = "";
private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+(:[\\d]+|[^:]*)$";
private final String consumerNode;
private final String bootstrapServers;
@ -88,6 +97,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final Map<String, String> commonClientConf;
private final List<String> activeTopics;
private final String consumerGroup;
private final int threadsPerWorker;
@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@ -100,6 +110,7 @@ public class ConsumeBenchSpec extends TaskSpec {
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("threadsPerWorker") Integer threadsPerWorker,
@JsonProperty("activeTopics") List<String> activeTopics) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
@ -110,7 +121,8 @@ public class ConsumeBenchSpec extends TaskSpec {
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.activeTopics = activeTopics == null ? new ArrayList<>() : activeTopics;
this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : consumerGroup;
this.consumerGroup = consumerGroup == null ? "" : consumerGroup;
this.threadsPerWorker = threadsPerWorker == null ? 1 : threadsPerWorker;
}
@JsonProperty
@ -138,6 +150,11 @@ public class ConsumeBenchSpec extends TaskSpec {
return maxMessages;
}
@JsonProperty
public int threadsPerWorker() {
return threadsPerWorker;
}
@JsonProperty
public Map<String, String> consumerConf() {
return consumerConf;

View File

@ -19,11 +19,13 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
@ -39,19 +41,24 @@ import org.slf4j.LoggerFactory;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Properties;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class ConsumeBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
@ -61,10 +68,11 @@ public class ConsumeBenchWorker implements TaskWorker {
private final ConsumeBenchSpec spec;
private final AtomicBoolean running = new AtomicBoolean(false);
private ScheduledExecutorService executor;
private WorkerStatusTracker status;
private WorkerStatusTracker workerStatus;
private StatusUpdater statusUpdater;
private Future<?> statusUpdaterFuture;
private KafkaFutureImpl<String> doneFuture;
private KafkaConsumer<byte[], byte[]> consumer;
private ThreadSafeConsumer consumer;
public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) {
this.id = id;
this.spec = spec;
@ -77,9 +85,12 @@ public class ConsumeBenchWorker implements TaskWorker {
throw new IllegalStateException("ConsumeBenchWorker is already running.");
}
log.info("{}: Activating ConsumeBenchWorker with {}", id, spec);
this.statusUpdater = new StatusUpdater();
this.executor = Executors.newScheduledThreadPool(
2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
this.status = status;
spec.threadsPerWorker() + 2, // 1 thread for all the ConsumeStatusUpdater and 1 for the StatusUpdater
ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
this.statusUpdaterFuture = executor.scheduleAtFixedRate(this.statusUpdater, 1, 1, TimeUnit.MINUTES);
this.workerStatus = status;
this.doneFuture = doneFuture;
executor.submit(new Prepare());
}
@ -88,41 +99,75 @@ public class ConsumeBenchWorker implements TaskWorker {
@Override
public void run() {
try {
executor.submit(consumeTask());
List<Future<Void>> consumeTasks = new ArrayList<>();
for (ConsumeMessages task : consumeTasks()) {
consumeTasks.add(executor.submit(task));
}
executor.submit(new CloseStatusUpdater(consumeTasks));
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}
}
private ConsumeMessages consumeTask() {
String consumerGroup = spec.consumerGroup();
private List<ConsumeMessages> consumeTasks() {
List<ConsumeMessages> tasks = new ArrayList<>();
String consumerGroup = consumerGroup();
int consumerCount = spec.threadsPerWorker();
Map<String, List<TopicPartition>> partitionsByTopic = spec.materializeTopics();
boolean toUseGroupPartitionAssignment = partitionsByTopic.values().isEmpty();
boolean toUseGroupPartitionAssignment = partitionsByTopic.values().stream().allMatch(List::isEmpty);
if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) // consumer group is undefined, the consumer should use a random group
consumerGroup = generateConsumerGroup();
if (!toUseGroupPartitionAssignment && !toUseRandomConsumeGroup() && consumerCount > 1)
throw new ConfigException("You may not specify an explicit partition assignment when using multiple consumers in the same group."
+ "Please leave the consumer group unset, specify topics instead of partitions or use a single consumer.");
consumer = consumer(consumerGroup);
if (!toUseGroupPartitionAssignment)
partitionsByTopic = populatePartitionsByTopic(consumer, partitionsByTopic);
consumer = consumer(consumerGroup, clientId(0));
if (toUseGroupPartitionAssignment) {
Set<String> topics = partitionsByTopic.keySet();
tasks.add(new ConsumeMessages(consumer, topics));
return new ConsumeMessages(consumer, partitionsByTopic, toUseGroupPartitionAssignment);
for (int i = 0; i < consumerCount - 1; i++) {
tasks.add(new ConsumeMessages(consumer(consumerGroup(), clientId(i + 1)), topics));
}
} else {
List<TopicPartition> partitions = populatePartitionsByTopic(consumer.consumer(), partitionsByTopic)
.values().stream().flatMap(List::stream).collect(Collectors.toList());
tasks.add(new ConsumeMessages(consumer, partitions));
for (int i = 0; i < consumerCount - 1; i++) {
tasks.add(new ConsumeMessages(consumer(consumerGroup(), clientId(i + 1)), partitions));
}
}
return tasks;
}
private KafkaConsumer<byte[], byte[]> consumer(String consumerGroup) {
private String clientId(int idx) {
return String.format("consumer.%s-%d", id, idx);
}
/**
* Creates a new KafkaConsumer instance
*/
private ThreadSafeConsumer consumer(String consumerGroup, String clientId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
// these defaults maybe over-written by the user-specified commonClientConf or consumerConf
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
return new ThreadSafeConsumer(new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()), clientId);
}
private String generateConsumerGroup() {
return "consume-bench-" + UUID.randomUUID().toString();
private String consumerGroup() {
return toUseRandomConsumeGroup()
? "consume-bench-" + UUID.randomUUID().toString()
: spec.consumerGroup();
}
private boolean toUseRandomConsumeGroup() {
return spec.consumerGroup().isEmpty();
}
private Map<String, List<TopicPartition>> populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer,
@ -151,29 +196,35 @@ public class ConsumeBenchWorker implements TaskWorker {
private final Histogram messageSizeHistogram;
private final Future<?> statusUpdaterFuture;
private final Throttle throttle;
private final KafkaConsumer<byte[], byte[]> consumer;
private final String clientId;
private final ThreadSafeConsumer consumer;
ConsumeMessages(KafkaConsumer<byte[], byte[]> consumer, Map<String, List<TopicPartition>> topicPartitionsByTopic,
boolean toUseGroupAssignment) {
private ConsumeMessages(ThreadSafeConsumer consumer) {
this.latencyHistogram = new Histogram(5000);
this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
this.clientId = consumer.clientId();
this.statusUpdaterFuture = executor.scheduleAtFixedRate(
new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
this.consumer = consumer;
if (toUseGroupAssignment) {
Set<String> topics = topicPartitionsByTopic.keySet();
log.info("Will consume from topics {} via dynamic group assignment.", topics);
this.consumer.subscribe(topics);
} else {
List<TopicPartition> partitions = topicPartitionsByTopic.values().stream()
.flatMap(List::stream).collect(Collectors.toList());
log.info("Will consume from topic partitions {} via manual assignment.", partitions);
this.consumer.assign(partitions);
}
new ConsumeStatusUpdater(latencyHistogram, messageSizeHistogram, consumer), 1, 1, TimeUnit.MINUTES);
int perPeriod;
if (spec.targetMessagesPerSec() <= 0)
perPeriod = Integer.MAX_VALUE;
else
perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
int perPeriod = WorkerUtils.perSecToPerPeriod(
spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
this.consumer = consumer;
}
ConsumeMessages(ThreadSafeConsumer consumer, Set<String> topics) {
this(consumer);
log.info("Will consume from topics {} via dynamic group assignment.", topics);
this.consumer.subscribe(topics);
}
ConsumeMessages(ThreadSafeConsumer consumer, List<TopicPartition> partitions) {
this(consumer);
log.info("Will consume from topic partitions {} via manual assignment.", partitions);
this.consumer.assign(partitions);
}
@Override
@ -182,9 +233,10 @@ public class ConsumeBenchWorker implements TaskWorker {
long bytesConsumed = 0;
long startTimeMs = Time.SYSTEM.milliseconds();
long startBatchMs = startTimeMs;
int maxMessages = spec.maxMessages();
try {
while (messagesConsumed < spec.maxMessages()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50));
while (messagesConsumed < maxMessages) {
ConsumerRecords<byte[], byte[]> records = consumer.poll();
if (records.isEmpty()) {
continue;
}
@ -202,6 +254,9 @@ public class ConsumeBenchWorker implements TaskWorker {
latencyHistogram.add(elapsedBatchMs);
messageSizeHistogram.add(messageBytes);
bytesConsumed += messageBytes;
if (messagesConsumed >= maxMessages)
break;
throttle.increment();
}
startBatchMs = Time.SYSTEM.milliseconds();
@ -211,23 +266,44 @@ public class ConsumeBenchWorker implements TaskWorker {
} finally {
statusUpdaterFuture.cancel(false);
StatusData statusData =
new StatusUpdater(latencyHistogram, messageSizeHistogram).update();
new ConsumeStatusUpdater(latencyHistogram, messageSizeHistogram, consumer).update();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("Consumed total number of messages={}, bytes={} in {} ms. status: {}",
messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData);
log.info("{} Consumed total number of messages={}, bytes={} in {} ms. status: {}",
clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData);
}
doneFuture.complete("");
consumer.close();
return null;
}
}
public class StatusUpdater implements Runnable {
private final Histogram latencyHistogram;
private final Histogram messageSizeHistogram;
public class CloseStatusUpdater implements Runnable {
private final List<Future<Void>> consumeTasks;
StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) {
this.latencyHistogram = latencyHistogram;
this.messageSizeHistogram = messageSizeHistogram;
CloseStatusUpdater(List<Future<Void>> consumeTasks) {
this.consumeTasks = consumeTasks;
}
@Override
public void run() {
while (!consumeTasks.stream().allMatch(Future::isDone)) {
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
log.debug("{} was interrupted. Closing...", this.getClass().getName());
break; // close the thread
}
}
statusUpdaterFuture.cancel(false);
statusUpdater.update();
}
}
class StatusUpdater implements Runnable {
final Map<String, JsonNode> statuses;
StatusUpdater() {
statuses = new HashMap<>();
}
@Override
@ -235,7 +311,39 @@ public class ConsumeBenchWorker implements TaskWorker {
try {
update();
} catch (Exception e) {
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
}
}
synchronized void update() {
workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(statuses));
}
synchronized void updateConsumeStatus(String clientId, StatusData status) {
statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree(status));
}
}
/**
* Runnable class that updates the status of a single consumer
*/
public class ConsumeStatusUpdater implements Runnable {
private final Histogram latencyHistogram;
private final Histogram messageSizeHistogram;
private final ThreadSafeConsumer consumer;
ConsumeStatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram, ThreadSafeConsumer consumer) {
this.latencyHistogram = latencyHistogram;
this.messageSizeHistogram = messageSizeHistogram;
this.consumer = consumer;
}
@Override
public void run() {
try {
update();
} catch (Exception e) {
WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
}
}
@ -243,6 +351,7 @@ public class ConsumeBenchWorker implements TaskWorker {
Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES);
Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES);
StatusData statusData = new StatusData(
consumer.assignedPartitions(),
latSummary.numSamples(),
(long) (msgSummary.numSamples() * msgSummary.average()),
(long) msgSummary.average(),
@ -250,7 +359,7 @@ public class ConsumeBenchWorker implements TaskWorker {
latSummary.percentiles().get(0).value(),
latSummary.percentiles().get(1).value(),
latSummary.percentiles().get(2).value());
status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
log.info("Status={}", JsonUtil.toJsonString(statusData));
return statusData;
}
@ -258,6 +367,7 @@ public class ConsumeBenchWorker implements TaskWorker {
public static class StatusData {
private final long totalMessagesReceived;
private final List<String> assignedPartitions;
private final long totalBytesReceived;
private final long averageMessageSizeBytes;
private final float averageLatencyMs;
@ -270,15 +380,16 @@ public class ConsumeBenchWorker implements TaskWorker {
* These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
*/
final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
@JsonCreator
StatusData(@JsonProperty("totalMessagesReceived") long totalMessagesReceived,
StatusData(@JsonProperty("assignedPartitions") List<String> assignedPartitions,
@JsonProperty("totalMessagesReceived") long totalMessagesReceived,
@JsonProperty("totalBytesReceived") long totalBytesReceived,
@JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes,
@JsonProperty("averageLatencyMs") float averageLatencyMs,
@JsonProperty("p50LatencyMs") int p50latencyMs,
@JsonProperty("p95LatencyMs") int p95latencyMs,
@JsonProperty("p99LatencyMs") int p99latencyMs) {
this.assignedPartitions = assignedPartitions;
this.totalMessagesReceived = totalMessagesReceived;
this.totalBytesReceived = totalBytesReceived;
this.averageMessageSizeBytes = averageMessageSizeBytes;
@ -288,6 +399,11 @@ public class ConsumeBenchWorker implements TaskWorker {
this.p99LatencyMs = p99latencyMs;
}
@JsonProperty
public List<String> assignedPartitions() {
return assignedPartitions;
}
@JsonProperty
public long totalMessagesReceived() {
return totalMessagesReceived;
@ -333,11 +449,86 @@ public class ConsumeBenchWorker implements TaskWorker {
doneFuture.complete("");
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.DAYS);
Utils.closeQuietly(consumer, "consumer");
consumer.close();
this.consumer = null;
this.executor = null;
this.status = null;
this.statusUpdater = null;
this.statusUpdaterFuture = null;
this.workerStatus = null;
this.doneFuture = null;
}
/**
* A thread-safe KafkaConsumer wrapper
*/
private static class ThreadSafeConsumer {
private final KafkaConsumer<byte[], byte[]> consumer;
private final String clientId;
private final ReentrantLock consumerLock;
private boolean closed = false;
ThreadSafeConsumer(KafkaConsumer<byte[], byte[]> consumer, String clientId) {
this.consumer = consumer;
this.clientId = clientId;
this.consumerLock = new ReentrantLock();
}
ConsumerRecords<byte[], byte[]> poll() {
this.consumerLock.lock();
try {
return consumer.poll(Duration.ofMillis(50));
} finally {
this.consumerLock.unlock();
}
}
void close() {
if (closed)
return;
this.consumerLock.lock();
try {
consumer.unsubscribe();
Utils.closeQuietly(consumer, "consumer");
closed = true;
} finally {
this.consumerLock.unlock();
}
}
void subscribe(Set<String> topics) {
this.consumerLock.lock();
try {
consumer.subscribe(topics);
} finally {
this.consumerLock.unlock();
}
}
void assign(Collection<TopicPartition> partitions) {
this.consumerLock.lock();
try {
consumer.assign(partitions);
} finally {
this.consumerLock.unlock();
}
}
List<String> assignedPartitions() {
this.consumerLock.lock();
try {
return consumer.assignment().stream()
.map(TopicPartition::toString).collect(Collectors.toList());
} finally {
this.consumerLock.unlock();
}
}
String clientId() {
return clientId;
}
KafkaConsumer<byte[], byte[]> consumer() {
return consumer;
}
}
}

View File

@ -73,6 +73,6 @@ public class ConsumeBenchSpecTest {
private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
return new ConsumeBenchSpec(0, 0, "node", "localhost",
123, 1234, "cg-1",
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), activeTopics);
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 1, activeTopics);
}
}