From d28c5348197256db09b59d1ebbfe7db9d3934f47 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Mon, 29 Oct 2018 19:51:07 +0200 Subject: [PATCH] KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810) This ConsumeBenchWorker now supports using consumer groups. The groups may be either used to store offsets, or as subscriptions. --- TROGDOR.md | 3 +- tests/bin/trogdor-run-consume-bench.sh | 7 +- .../trogdor/consume_bench_workload.py | 55 ++++++++ .../trogdor/produce_bench_workload.py | 6 +- .../tests/core/consume_bench_test.py | 132 ++++++++++++++++++ .../tests/core/produce_bench_test.py | 2 + .../trogdor/workload/ConsumeBenchSpec.java | 122 ++++++++++++++-- .../trogdor/workload/ConsumeBenchWorker.java | 94 ++++++++++--- .../kafka/trogdor/workload/TopicsSpec.java | 7 +- .../workload/ConsumeBenchSpecTest.java | 78 +++++++++++ 10 files changed, 459 insertions(+), 47 deletions(-) create mode 100644 tests/kafkatest/services/trogdor/consume_bench_workload.py create mode 100644 tests/kafkatest/tests/core/consume_bench_test.py create mode 100644 tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java diff --git a/TROGDOR.md b/TROGDOR.md index 3783d7e8193..d71455a6e49 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -141,7 +141,8 @@ ProduceBench starts a Kafka producer on a single agent node, producing to severa RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer. ### ConsumeBench -ConsumeBench starts a Kafka consumer on a single agent node. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. +ConsumeBench starts a Kafka consumer on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumer either subscribes to a set of topics (leveraging consumer group functionality) or manually assigns partitions to itself. +The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. Faults ======================================== diff --git a/tests/bin/trogdor-run-consume-bench.sh b/tests/bin/trogdor-run-consume-bench.sh index 2e0239e4b02..be9a2f1a941 100755 --- a/tests/bin/trogdor-run-consume-bench.sh +++ b/tests/bin/trogdor-run-consume-bench.sh @@ -26,12 +26,7 @@ cat < consumerConf; private final Map adminClientConf; private final Map commonClientConf; - private final TopicsSpec activeTopics; + private final List activeTopics; + private final String consumerGroup; @JsonCreator public ConsumeBenchSpec(@JsonProperty("startMs") long startMs, @@ -50,10 +96,11 @@ public class ConsumeBenchSpec extends TaskSpec { @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, @JsonProperty("maxMessages") int maxMessages, + @JsonProperty("consumerGroup") String consumerGroup, @JsonProperty("consumerConf") Map consumerConf, @JsonProperty("commonClientConf") Map commonClientConf, @JsonProperty("adminClientConf") Map adminClientConf, - @JsonProperty("activeTopics") TopicsSpec activeTopics) { + @JsonProperty("activeTopics") List activeTopics) { super(startMs, durationMs); this.consumerNode = (consumerNode == null) ? "" : consumerNode; this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; @@ -62,7 +109,8 @@ public class ConsumeBenchSpec extends TaskSpec { this.consumerConf = configOrEmptyMap(consumerConf); this.commonClientConf = configOrEmptyMap(commonClientConf); this.adminClientConf = configOrEmptyMap(adminClientConf); - this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : activeTopics.immutableCopy(); + this.activeTopics = activeTopics == null ? new ArrayList<>() : activeTopics; + this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : consumerGroup; } @JsonProperty @@ -70,6 +118,11 @@ public class ConsumeBenchSpec extends TaskSpec { return consumerNode; } + @JsonProperty + public String consumerGroup() { + return consumerGroup; + } + @JsonProperty public String bootstrapServers() { return bootstrapServers; @@ -101,22 +154,67 @@ public class ConsumeBenchSpec extends TaskSpec { } @JsonProperty - public TopicsSpec activeTopics() { + public List activeTopics() { return activeTopics; } @Override public TaskController newController(String id) { - return new TaskController() { - @Override - public Set targetNodes(Topology topology) { - return Collections.singleton(consumerNode); - } - }; + return topology -> Collections.singleton(consumerNode); } @Override public TaskWorker newTaskWorker(String id) { return new ConsumeBenchWorker(id, this); } + + /** + * Materializes a list of topic names (optionally with ranges) into a map of the topics and their partitions + * + * Example: + * ['foo[1-3]', 'foobar:2', 'bar[1-2]:[1-2]'] => {'foo1': [], 'foo2': [], 'foo3': [], 'foobar': [2], + * 'bar1': [1, 2], 'bar2': [1, 2] } + */ + Map> materializeTopics() { + Map> partitionsByTopics = new HashMap<>(); + + for (String rawTopicName : this.activeTopics) { + Set expandedNames = expandTopicName(rawTopicName); + if (!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN)) + throw new IllegalArgumentException(String.format("Expanded topic name %s is invalid", rawTopicName)); + + for (String topicName : expandedNames) { + TopicPartition partition = null; + if (topicName.contains(":")) { + String[] topicAndPartition = topicName.split(":"); + topicName = topicAndPartition[0]; + partition = new TopicPartition(topicName, Integer.parseInt(topicAndPartition[1])); + } + if (!partitionsByTopics.containsKey(topicName)) { + partitionsByTopics.put(topicName, new ArrayList<>()); + } + if (partition != null) { + partitionsByTopics.get(topicName).add(partition); + } + } + } + + return partitionsByTopics; + } + + /** + * Expands a topic name until there are no more ranges in it + */ + private Set expandTopicName(String topicName) { + Set expandedNames = StringExpander.expand(topicName); + if (expandedNames.size() == 1) { + return expandedNames; + } + + Set newNames = new HashSet<>(); + for (String name : expandedNames) { + newNames.addAll(expandTopicName(name)); + } + return newNames; + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index c3a90e4da6a..b0998f0bdcf 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -39,9 +39,10 @@ import org.slf4j.LoggerFactory; import org.apache.kafka.trogdor.task.TaskWorker; import java.time.Duration; -import java.util.Collection; -import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -49,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; public class ConsumeBenchWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class); @@ -86,18 +88,62 @@ public class ConsumeBenchWorker implements TaskWorker { @Override public void run() { try { - HashSet partitions = new HashSet<>(); - for (Map.Entry entry : spec.activeTopics().materialize().entrySet()) { - for (Integer partitionNumber : entry.getValue().partitionNumbers()) { - partitions.add(new TopicPartition(entry.getKey(), partitionNumber)); - } - } - log.info("Will consume from {}", partitions); - executor.submit(new ConsumeMessages(partitions)); + executor.submit(consumeTask()); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); } } + + private ConsumeMessages consumeTask() { + String consumerGroup = spec.consumerGroup(); + Map> partitionsByTopic = spec.materializeTopics(); + boolean toUseGroupPartitionAssignment = partitionsByTopic.values().isEmpty(); + + if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) // consumer group is undefined, the consumer should use a random group + consumerGroup = generateConsumerGroup(); + + consumer = consumer(consumerGroup); + if (!toUseGroupPartitionAssignment) + partitionsByTopic = populatePartitionsByTopic(consumer, partitionsByTopic); + + return new ConsumeMessages(consumer, partitionsByTopic, toUseGroupPartitionAssignment); + } + + private KafkaConsumer consumer(String consumerGroup) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); + // these defaults maybe over-written by the user-specified commonClientConf or consumerConf + WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); + return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + + private String generateConsumerGroup() { + return "consume-bench-" + UUID.randomUUID().toString(); + } + + private Map> populatePartitionsByTopic(KafkaConsumer consumer, + Map> materializedTopics) { + // fetch partitions for topics who do not have any listed + for (Map.Entry> entry : materializedTopics.entrySet()) { + String topicName = entry.getKey(); + List partitions = entry.getValue(); + + if (partitions.isEmpty()) { + List fetchedPartitions = consumer.partitionsFor(topicName).stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + partitions.addAll(fetchedPartitions); + } + + materializedTopics.put(topicName, partitions); + } + + return materializedTopics; + } } public class ConsumeMessages implements Callable { @@ -105,24 +151,26 @@ public class ConsumeBenchWorker implements TaskWorker { private final Histogram messageSizeHistogram; private final Future statusUpdaterFuture; private final Throttle throttle; + private final KafkaConsumer consumer; - ConsumeMessages(Collection topicPartitions) { + ConsumeMessages(KafkaConsumer consumer, Map> topicPartitionsByTopic, + boolean toUseGroupAssignment) { this.latencyHistogram = new Histogram(5000); this.messageSizeHistogram = new Histogram(2 * 1024 * 1024); this.statusUpdaterFuture = executor.scheduleAtFixedRate( new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES); - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); - // these defaults maybe over-written by the user-specified commonClientConf or - // consumerConf - WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); - consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), - new ByteArrayDeserializer()); - consumer.assign(topicPartitions); + this.consumer = consumer; + if (toUseGroupAssignment) { + Set topics = topicPartitionsByTopic.keySet(); + log.info("Will consume from topics {} via dynamic group assignment.", topics); + this.consumer.subscribe(topics); + } else { + List partitions = topicPartitionsByTopic.values().stream() + .flatMap(List::stream).collect(Collectors.toList()); + log.info("Will consume from topic partitions {} via manual assignment.", partitions); + this.consumer.assign(partitions); + } + int perPeriod = WorkerUtils.perSecToPerPeriod( spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java index a9b550d648c..dcb8d8ad50b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java @@ -80,9 +80,10 @@ public class TopicsSpec extends Message { public Map materialize() { HashMap all = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { - for (String topicName : StringExpander.expand(entry.getKey())) { - all.put(topicName, entry.getValue()); - } + String topicName = entry.getKey(); + PartitionsSpec partitions = entry.getValue(); + for (String expandedTopicName : StringExpander.expand(topicName)) + all.put(expandedTopicName, partitions); } return all; } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java new file mode 100644 index 00000000000..117954b7caa --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ConsumeBenchSpecTest { + + @Test + public void testMaterializeTopicsWithNoPartitions() { + Map> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]", "secondTopic")).materializeTopics(); + Map> expected = new HashMap<>(); + expected.put("topic1", new ArrayList<>()); + expected.put("topic2", new ArrayList<>()); + expected.put("topic3", new ArrayList<>()); + expected.put("secondTopic", new ArrayList<>()); + + assertEquals(expected, materializedTopics); + } + + @Test + public void testMaterializeTopicsWithSomePartitions() { + Map> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]:[1-5]", "secondTopic", "thirdTopic:1")).materializeTopics(); + Map> expected = new HashMap<>(); + expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic1", (int) i)).collect(Collectors.toList())); + expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic2", (int) i)).collect(Collectors.toList())); + expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic3", (int) i)).collect(Collectors.toList())); + expected.put("secondTopic", new ArrayList<>()); + expected.put("thirdTopic", Collections.singletonList(new TopicPartition("thirdTopic", 1))); + + assertEquals(expected, materializedTopics); + } + + @Test + public void testInvalidTopicNameRaisesExceptionInMaterialize() { + for (String invalidName : Arrays.asList("In:valid", "invalid:", ":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) { + try { + consumeBenchSpec(Collections.singletonList(invalidName)).materializeTopics(); + fail(String.format("Invalid topic name (%s) should have raised an exception.", invalidName)); + } catch (IllegalArgumentException ignored) { } + } + + } + + private ConsumeBenchSpec consumeBenchSpec(List activeTopics) { + return new ConsumeBenchSpec(0, 0, "node", "localhost", + 123, 1234, "cg-1", + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), activeTopics); + } +}