KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810)

This ConsumeBenchWorker now supports using consumer groups.  The groups may be either used to store offsets, or as subscriptions.
This commit is contained in:
Stanislav Kozlovski 2018-10-29 19:51:07 +02:00 committed by Colin Patrick McCabe
parent 9a0ea25fee
commit d28c534819
10 changed files with 459 additions and 47 deletions

View File

@ -141,7 +141,8 @@ ProduceBench starts a Kafka producer on a single agent node, producing to severa
RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer.
### ConsumeBench
ConsumeBench starts a Kafka consumer on a single agent node. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency.
ConsumeBench starts a Kafka consumer on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumer either subscribes to a set of topics (leveraging consumer group functionality) or manually assigns partitions to itself.
The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency.
Faults
========================================

View File

@ -26,12 +26,7 @@ cat <<EOF
"consumerNode": "node0",
"bootstrapServers": "localhost:9092",
"maxMessages": 100,
"activeTopics": {
"foo[1-3]": {
"numPartitions": 3,
"replicationFactor": 1
}
}
"activeTopics": ["foo[1-3]"]
}
}
EOF

View File

@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from kafkatest.services.trogdor.task_spec import TaskSpec
class ConsumeBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
target_messages_per_sec, max_messages, active_topics,
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None):
super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ConsumeBenchSpec"
self.message["consumerNode"] = consumer_node
self.message["bootstrapServers"] = bootstrap_servers
self.message["targetMessagesPerSec"] = target_messages_per_sec
self.message["maxMessages"] = max_messages
self.message["consumerConf"] = consumer_conf
self.message["adminClientConf"] = admin_client_conf
self.message["commonClientConf"] = common_client_conf
self.message["activeTopics"] = active_topics
if consumer_group is not None:
self.message["consumerGroup"] = consumer_group
class ConsumeBenchWorkloadService(Service):
def __init__(self, context, kafka):
Service.__init__(self, context, num_nodes=1)
self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
self.consumer_node = self.nodes[0].account.hostname
def free(self):
Service.free(self)
def wait_node(self, node, timeout_sec=None):
pass
def stop_node(self, node):
pass
def clean_node(self, node):
pass

View File

@ -20,8 +20,8 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class ProduceBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
target_messages_per_sec, max_messages, producer_conf,
inactive_topics, active_topics):
target_messages_per_sec, max_messages, producer_conf, admin_client_conf,
common_client_conf, inactive_topics, active_topics):
super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
self.message["producerNode"] = producer_node
@ -29,6 +29,8 @@ class ProduceBenchWorkloadSpec(TaskSpec):
self.message["targetMessagesPerSec"] = target_messages_per_sec
self.message["maxMessages"] = max_messages
self.message["producerConf"] = producer_conf
self.message["adminClientConf"] = admin_client_conf
self.message["commonClientConf"] = common_client_conf
self.message["inactiveTopics"] = inactive_topics
self.message["activeTopics"] = active_topics

View File

@ -0,0 +1,132 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from ducktape.mark import parametrize
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService
from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService
class ConsumeBenchTest(Test):
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ConsumeBenchTest, self).__init__(test_context)
self.zk = ZookeeperService(test_context, num_nodes=3)
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka)
self.consumer_workload_service_2 = ConsumeBenchWorkloadService(test_context, self.kafka)
self.active_topics = {"consume_bench_topic[0-5]": {"numPartitions": 5, "replicationFactor": 3}}
self.trogdor = TrogdorService(context=self.test_context,
client_services=[self.kafka, self.producer_workload_service,
self.consumer_workload_service,
self.consumer_workload_service_2])
def setUp(self):
self.trogdor.start()
self.zk.start()
self.kafka.start()
def teardown(self):
self.trogdor.stop()
self.kafka.stop()
self.zk.stop()
def produce_messages(self, topics, max_messages=10000):
produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.producer_workload_service.producer_node,
self.producer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=max_messages,
producer_conf={},
admin_client_conf={},
common_client_conf={},
inactive_topics={},
active_topics=topics)
produce_workload = self.trogdor.create_task("produce_workload", produce_spec)
produce_workload.wait_for_done(timeout_sec=180)
self.logger.debug("Produce workload finished")
@parametrize(topics=["consume_bench_topic[0-5]"]) # topic subscription
@parametrize(topics=["consume_bench_topic[0-5]:[0-4]"]) # manual topic assignment
def test_consume_bench(self, topics):
"""
Runs a ConsumeBench workload to consume messages
"""
self.produce_messages(self.active_topics)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=10000,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
active_topics=topics)
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_consume_bench_single_partition(self):
"""
Run a ConsumeBench against a single partition
"""
active_topics = {"consume_bench_topic": {"numPartitions": 2, "replicationFactor": 3}}
self.produce_messages(active_topics, 5000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=2500,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
active_topics=["consume_bench_topic:1"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=180)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
def test_consume_group_bench(self):
"""
Runs two ConsumeBench workloads in the same consumer group to read messages from topics
"""
self.produce_messages(self.active_topics)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=2000, # both should read at least 2k messages
consumer_conf={},
admin_client_conf={},
common_client_conf={},
consumer_group="testGroup",
active_topics=["consume_bench_topic[0-5]"])
consume_workload_1 = self.trogdor.create_task("consume_workload_1", consume_spec)
consume_workload_2 = self.trogdor.create_task("consume_workload_2", consume_spec)
consume_workload_1.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload 1 finished")
consume_workload_2.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload 2 finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

View File

@ -51,6 +51,8 @@ class ProduceBenchTest(Test):
target_messages_per_sec=1000,
max_messages=100000,
producer_conf={},
admin_client_conf={},
common_client_conf={},
inactive_topics=inactive_topics,
active_topics=active_topics)
workload1 = self.trogdor.create_task("workload1", spec)

View File

@ -20,20 +20,65 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.trogdor.common.StringExpander;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
/**
* The specification for a benchmark that produces messages to a set of topics.
* The specification for a benchmark that consumer messages from a set of topic/partitions.
*
* If a consumer group is not given to the specification, a random one will be generated and
* used to track offsets/subscribe to topics.
*
* This specification uses a specific way to represent a topic partition via its "activeTopics" field.
* The notation for that is topic_name:partition_number (e.g "foo:1" represents partition-1 of topic "foo")
* Note that a topic name cannot have more than one colon.
*
* The "activeTopics" field also supports ranges that get expanded. See #{@link StringExpander}.
*
* There now exists a clever and succinct way to represent multiple partitions of multiple topics.
* Example:
* Given "activeTopics": ["foo[1-3]:[1-3]"], "foo[1-3]:[1-3]" will get
* expanded to [foo1:1, foo1:2, foo1:3, foo2:1, ..., foo3:3].
* This represents all partitions 1-3 for the three topics foo1, foo2 and foo3.
*
* If there is at least one topic:partition pair, the consumer will be manually assigned partitions via
* #{@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)}.
* Note that in this case the consumer will fetch and assign all partitions for a topic if no partition is given for it (e.g ["foo:1", "bar"])
*
* If there are no topic:partition pairs given, the consumer will subscribe to the topics via
* #{@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}.
* It will be assigned partitions dynamically from the consumer group.
*
* An example JSON representation which will result in a consumer that is part of the consumer group "cg" and
* subscribed to topics foo1, foo2, foo3 and bar.
* #{@code
* {
* "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
* "durationMs": 10000000,
* "consumerNode": "node0",
* "bootstrapServers": "localhost:9092",
* "maxMessages": 100,
* "consumerGroup": "cg",
* "activeTopics": ["foo[1-3]", "bar"]
* }
* }
*/
public class ConsumeBenchSpec extends TaskSpec {
static final String EMPTY_CONSUMER_GROUP = "";
private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+(:[\\d]+|[^:]*)$";
private final String consumerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
@ -41,7 +86,8 @@ public class ConsumeBenchSpec extends TaskSpec {
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
private final TopicsSpec activeTopics;
private final List<String> activeTopics;
private final String consumerGroup;
@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@ -50,10 +96,11 @@ public class ConsumeBenchSpec extends TaskSpec {
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("consumerGroup") String consumerGroup,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("activeTopics") TopicsSpec activeTopics) {
@JsonProperty("activeTopics") List<String> activeTopics) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
@ -62,7 +109,8 @@ public class ConsumeBenchSpec extends TaskSpec {
this.consumerConf = configOrEmptyMap(consumerConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : activeTopics.immutableCopy();
this.activeTopics = activeTopics == null ? new ArrayList<>() : activeTopics;
this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : consumerGroup;
}
@JsonProperty
@ -70,6 +118,11 @@ public class ConsumeBenchSpec extends TaskSpec {
return consumerNode;
}
@JsonProperty
public String consumerGroup() {
return consumerGroup;
}
@JsonProperty
public String bootstrapServers() {
return bootstrapServers;
@ -101,22 +154,67 @@ public class ConsumeBenchSpec extends TaskSpec {
}
@JsonProperty
public TopicsSpec activeTopics() {
public List<String> activeTopics() {
return activeTopics;
}
@Override
public TaskController newController(String id) {
return new TaskController() {
@Override
public Set<String> targetNodes(Topology topology) {
return Collections.singleton(consumerNode);
}
};
return topology -> Collections.singleton(consumerNode);
}
@Override
public TaskWorker newTaskWorker(String id) {
return new ConsumeBenchWorker(id, this);
}
/**
* Materializes a list of topic names (optionally with ranges) into a map of the topics and their partitions
*
* Example:
* ['foo[1-3]', 'foobar:2', 'bar[1-2]:[1-2]'] => {'foo1': [], 'foo2': [], 'foo3': [], 'foobar': [2],
* 'bar1': [1, 2], 'bar2': [1, 2] }
*/
Map<String, List<TopicPartition>> materializeTopics() {
Map<String, List<TopicPartition>> partitionsByTopics = new HashMap<>();
for (String rawTopicName : this.activeTopics) {
Set<String> expandedNames = expandTopicName(rawTopicName);
if (!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN))
throw new IllegalArgumentException(String.format("Expanded topic name %s is invalid", rawTopicName));
for (String topicName : expandedNames) {
TopicPartition partition = null;
if (topicName.contains(":")) {
String[] topicAndPartition = topicName.split(":");
topicName = topicAndPartition[0];
partition = new TopicPartition(topicName, Integer.parseInt(topicAndPartition[1]));
}
if (!partitionsByTopics.containsKey(topicName)) {
partitionsByTopics.put(topicName, new ArrayList<>());
}
if (partition != null) {
partitionsByTopics.get(topicName).add(partition);
}
}
}
return partitionsByTopics;
}
/**
* Expands a topic name until there are no more ranges in it
*/
private Set<String> expandTopicName(String topicName) {
Set<String> expandedNames = StringExpander.expand(topicName);
if (expandedNames.size() == 1) {
return expandedNames;
}
Set<String> newNames = new HashSet<>();
for (String name : expandedNames) {
newNames.addAll(expandTopicName(name));
}
return newNames;
}
}

View File

@ -39,9 +39,10 @@ import org.slf4j.LoggerFactory;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@ -49,6 +50,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class ConsumeBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
@ -86,18 +88,62 @@ public class ConsumeBenchWorker implements TaskWorker {
@Override
public void run() {
try {
HashSet<TopicPartition> partitions = new HashSet<>();
for (Map.Entry<String, PartitionsSpec> entry : spec.activeTopics().materialize().entrySet()) {
for (Integer partitionNumber : entry.getValue().partitionNumbers()) {
partitions.add(new TopicPartition(entry.getKey(), partitionNumber));
}
}
log.info("Will consume from {}", partitions);
executor.submit(new ConsumeMessages(partitions));
executor.submit(consumeTask());
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}
}
private ConsumeMessages consumeTask() {
String consumerGroup = spec.consumerGroup();
Map<String, List<TopicPartition>> partitionsByTopic = spec.materializeTopics();
boolean toUseGroupPartitionAssignment = partitionsByTopic.values().isEmpty();
if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) // consumer group is undefined, the consumer should use a random group
consumerGroup = generateConsumerGroup();
consumer = consumer(consumerGroup);
if (!toUseGroupPartitionAssignment)
partitionsByTopic = populatePartitionsByTopic(consumer, partitionsByTopic);
return new ConsumeMessages(consumer, partitionsByTopic, toUseGroupPartitionAssignment);
}
private KafkaConsumer<byte[], byte[]> consumer(String consumerGroup) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
// these defaults maybe over-written by the user-specified commonClientConf or consumerConf
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
private String generateConsumerGroup() {
return "consume-bench-" + UUID.randomUUID().toString();
}
private Map<String, List<TopicPartition>> populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer,
Map<String, List<TopicPartition>> materializedTopics) {
// fetch partitions for topics who do not have any listed
for (Map.Entry<String, List<TopicPartition>> entry : materializedTopics.entrySet()) {
String topicName = entry.getKey();
List<TopicPartition> partitions = entry.getValue();
if (partitions.isEmpty()) {
List<TopicPartition> fetchedPartitions = consumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
partitions.addAll(fetchedPartitions);
}
materializedTopics.put(topicName, partitions);
}
return materializedTopics;
}
}
public class ConsumeMessages implements Callable<Void> {
@ -105,24 +151,26 @@ public class ConsumeBenchWorker implements TaskWorker {
private final Histogram messageSizeHistogram;
private final Future<?> statusUpdaterFuture;
private final Throttle throttle;
private final KafkaConsumer<byte[], byte[]> consumer;
ConsumeMessages(Collection<TopicPartition> topicPartitions) {
ConsumeMessages(KafkaConsumer<byte[], byte[]> consumer, Map<String, List<TopicPartition>> topicPartitionsByTopic,
boolean toUseGroupAssignment) {
this.latencyHistogram = new Histogram(5000);
this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
this.statusUpdaterFuture = executor.scheduleAtFixedRate(
new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
// these defaults maybe over-written by the user-specified commonClientConf or
// consumerConf
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
consumer.assign(topicPartitions);
this.consumer = consumer;
if (toUseGroupAssignment) {
Set<String> topics = topicPartitionsByTopic.keySet();
log.info("Will consume from topics {} via dynamic group assignment.", topics);
this.consumer.subscribe(topics);
} else {
List<TopicPartition> partitions = topicPartitionsByTopic.values().stream()
.flatMap(List::stream).collect(Collectors.toList());
log.info("Will consume from topic partitions {} via manual assignment.", partitions);
this.consumer.assign(partitions);
}
int perPeriod = WorkerUtils.perSecToPerPeriod(
spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);

View File

@ -80,9 +80,10 @@ public class TopicsSpec extends Message {
public Map<String, PartitionsSpec> materialize() {
HashMap<String, PartitionsSpec> all = new HashMap<>();
for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) {
for (String topicName : StringExpander.expand(entry.getKey())) {
all.put(topicName, entry.getValue());
}
String topicName = entry.getKey();
PartitionsSpec partitions = entry.getValue();
for (String expandedTopicName : StringExpander.expand(topicName))
all.put(expandedTopicName, partitions);
}
return all;
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class ConsumeBenchSpecTest {
@Test
public void testMaterializeTopicsWithNoPartitions() {
Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]", "secondTopic")).materializeTopics();
Map<String, List<TopicPartition>> expected = new HashMap<>();
expected.put("topic1", new ArrayList<>());
expected.put("topic2", new ArrayList<>());
expected.put("topic3", new ArrayList<>());
expected.put("secondTopic", new ArrayList<>());
assertEquals(expected, materializedTopics);
}
@Test
public void testMaterializeTopicsWithSomePartitions() {
Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]:[1-5]", "secondTopic", "thirdTopic:1")).materializeTopics();
Map<String, List<TopicPartition>> expected = new HashMap<>();
expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic1", (int) i)).collect(Collectors.toList()));
expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic2", (int) i)).collect(Collectors.toList()));
expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic3", (int) i)).collect(Collectors.toList()));
expected.put("secondTopic", new ArrayList<>());
expected.put("thirdTopic", Collections.singletonList(new TopicPartition("thirdTopic", 1)));
assertEquals(expected, materializedTopics);
}
@Test
public void testInvalidTopicNameRaisesExceptionInMaterialize() {
for (String invalidName : Arrays.asList("In:valid", "invalid:", ":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) {
try {
consumeBenchSpec(Collections.singletonList(invalidName)).materializeTopics();
fail(String.format("Invalid topic name (%s) should have raised an exception.", invalidName));
} catch (IllegalArgumentException ignored) { }
}
}
private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
return new ConsumeBenchSpec(0, 0, "node", "localhost",
123, 1234, "cg-1",
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), activeTopics);
}
}