KAFKA-18122 : Added support for ShareConsumeBenchWorker (#17984)

Added ShareConsumeBenchSpec and ShareConsumeBenchWorker similar to ConsumeBenchSpec/ConsumeBenchWorker. This will help us run trogdor workloads for share consumers as well.
Added a sample json workload running 5 share consumers.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
ShivsundarR 2024-12-05 08:16:32 -05:00 committed by GitHub
parent 2ad111ff3e
commit 50b6953661
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 810 additions and 0 deletions

View File

@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An example task specification for running a consumer benchmark in Trogdor.
// See trogdor/README.md for details.
//
{
"class": "org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec",
"durationMs": 10000000,
"consumerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 1000,
"threadsPerWorker": 5,
"shareGroup": "sg",
"maxMessages": 10000,
"activeTopics": [ "foo[1-3]" ]
}

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.trogdor.common.StringExpander;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* The specification for a benchmark that consumes messages from a set of topic/partitions.
*
* If a share group is not given to the specification, the default group name "share" will be used.
*
* This specification uses a specific way to represent a topic partition via its "activeTopics" field.
* The notation for that is topic_name:partition_number (e.g "foo:1" represents partition-1 of topic "foo")
* Note that a topic name cannot have more than one colon.
*
* The "activeTopics" field also supports ranges that get expanded. See #{@link StringExpander}.
*
* There now exists a clever and succinct way to represent multiple topics.
* Example:
* Given "activeTopics": ["foo[1-3]"], "foo[1-3]" will get
* expanded to [foo1, foo2, foo3].
*
* The consumer will subscribe to the topics via
* #{@link org.apache.kafka.clients.consumer.KafkaShareConsumer#subscribe(Collection)}.
* It will be assigned partitions dynamically from the share group by the broker.
*
* This specification supports the spawning of multiple share consumers in the single Trogdor worker agent.
* The "threadsPerWorker" field denotes how many consumers should be spawned for this spec.
* It is worth noting that the "targetMessagesPerSec", "maxMessages" and "activeTopics" fields apply for every share consumer individually.
*
* The "recordProcessor" field allows the specification of tasks to run on records that are consumed. This is run
* immediately after the messages are polled. See the `RecordProcessor` interface for more information.
*
* An example JSON representation which will result in a share consumer that is part of the share group "sg" and
* subscribed to topics foo1, foo2, foo3 and bar.
* #{@code
* {
* "class": "org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec",
* "durationMs": 10000000,
* "consumerNode": "node0",
* "bootstrapServers": "localhost:9092",
* "maxMessages": 100,
* "shareGroup": "sg",
* "activeTopics": ["foo[1-3]", "bar"]
* }
* }
*/
public final class ShareConsumeBenchSpec extends TaskSpec {
private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+$";
private static final String DEFAULT_SHARE_GROUP_NAME = "share";
private final String consumerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final long maxMessages;
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
private final List<String> activeTopics;
private final String shareGroup;
private final int threadsPerWorker;
private final Optional<RecordProcessor> recordProcessor;
@JsonCreator
public ShareConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("consumerNode") String consumerNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") long maxMessages,
@JsonProperty("consumerGroup") String shareGroup,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("threadsPerWorker") Integer threadsPerWorker,
@JsonProperty("recordProcessor") Optional<RecordProcessor> recordProcessor,
@JsonProperty("activeTopics") List<String> activeTopics) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
this.targetMessagesPerSec = targetMessagesPerSec;
this.maxMessages = maxMessages;
this.consumerConf = configOrEmptyMap(consumerConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.activeTopics = activeTopics == null ? new ArrayList<>() : activeTopics;
this.shareGroup = shareGroup == null ? DEFAULT_SHARE_GROUP_NAME : shareGroup;
this.threadsPerWorker = threadsPerWorker == null ? 1 : threadsPerWorker;
this.recordProcessor = recordProcessor;
}
@JsonProperty
public String consumerNode() {
return consumerNode;
}
@JsonProperty
public String shareGroup() {
return shareGroup;
}
@JsonProperty
public String bootstrapServers() {
return bootstrapServers;
}
@JsonProperty
public int targetMessagesPerSec() {
return targetMessagesPerSec;
}
@JsonProperty
public long maxMessages() {
return maxMessages;
}
@JsonProperty
public int threadsPerWorker() {
return threadsPerWorker;
}
@JsonProperty
public Optional<RecordProcessor> recordProcessor() {
return this.recordProcessor;
}
@JsonProperty
public Map<String, String> consumerConf() {
return consumerConf;
}
@JsonProperty
public Map<String, String> commonClientConf() {
return commonClientConf;
}
@JsonProperty
public Map<String, String> adminClientConf() {
return adminClientConf;
}
@JsonProperty
public List<String> activeTopics() {
return activeTopics;
}
@Override
public TaskController newController(String id) {
return topology -> Collections.singleton(consumerNode);
}
@Override
public TaskWorker newTaskWorker(String id) {
return new ShareConsumeBenchWorker(id, this);
}
/**
* Materializes a list of topic names (optionally with ranges) into a map of the topics and their partitions
*
* Example:
* ['foo[1-3]', 'bar[1-2]'] => {'foo1', 'foo2', 'foo3', 'bar1', 'bar2' }
*/
Set<String> expandTopicNames() {
Set<String> expandedTopics = new HashSet<>();
for (String rawTopicName : this.activeTopics) {
Set<String> expandedNames = expandTopicName(rawTopicName);
if (!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN))
throw new IllegalArgumentException(String.format("Expanded topic name %s is invalid", expandedNames));
expandedTopics.addAll(expandedNames);
}
return expandedTopics;
}
/**
* Expands a topic name until there are no more ranges in it
*/
private Set<String> expandTopicName(String topicName) {
Set<String> expandedNames = StringExpander.expand(topicName);
if (expandedNames.size() == 1) {
return expandedNames;
}
Set<String> newNames = new HashSet<>();
for (String name : expandedNames) {
newNames.addAll(expandTopicName(name));
}
return newNames;
}
}

View File

@ -0,0 +1,498 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
public class ShareConsumeBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ShareConsumeBenchWorker.class);
private static final int THROTTLE_PERIOD_MS = 100;
private final String id;
private final ShareConsumeBenchSpec spec;
private final AtomicBoolean running = new AtomicBoolean(false);
private ScheduledExecutorService executor;
private WorkerStatusTracker workerStatus;
private StatusUpdater statusUpdater;
private Future<?> statusUpdaterFuture;
private KafkaFutureImpl<String> doneFuture;
public ShareConsumeBenchWorker(String id, ShareConsumeBenchSpec spec) {
this.id = id;
this.spec = spec;
}
@Override
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("ShareConsumeBenchWorker is already running.");
}
log.info("{}: Activating ShareConsumeBenchWorker with {}", id, spec);
this.statusUpdater = new StatusUpdater();
this.executor = Executors.newScheduledThreadPool(
spec.threadsPerWorker() + 2, // 1 thread for all the ConsumeStatusUpdater and 1 for the StatusUpdater
ThreadUtils.createThreadFactory("ShareConsumeBenchWorkerThread%d", false));
this.statusUpdaterFuture = executor.scheduleAtFixedRate(this.statusUpdater, 1, 1, TimeUnit.MINUTES);
this.workerStatus = status;
this.doneFuture = doneFuture;
executor.submit(new Prepare());
}
public class Prepare implements Runnable {
@Override
public void run() {
try {
List<Future<Void>> consumeTasks = new ArrayList<>();
for (ConsumeMessages task : consumeTasks()) {
consumeTasks.add(executor.submit(task));
}
executor.submit(new CloseStatusUpdater(consumeTasks));
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}
}
private List<ConsumeMessages> consumeTasks() {
List<ConsumeMessages> tasks = new ArrayList<>();
String shareGroup = shareGroup();
int consumerCount = spec.threadsPerWorker();
Set<String> topics = new HashSet<>(spec.expandTopicNames());
for (int i = 0; i < consumerCount; i++) {
tasks.add(new ConsumeMessages(consumer(shareGroup, clientId(i)), spec.recordProcessor(), topics));
}
return tasks;
}
private String clientId(int idx) {
return String.format("consumer.%s-%d", id, idx);
}
/**
* Creates a new KafkaConsumer instance
*/
private ThreadSafeShareConsumer consumer(String shareGroup, String clientId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
// these defaults maybe over-written by the user-specified commonClientConf or consumerConf
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
return new ThreadSafeShareConsumer(new KafkaShareConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()), clientId);
}
private String shareGroup() {
return spec.shareGroup();
}
}
public class ConsumeMessages implements Callable<Void> {
private final Histogram latencyHistogram;
private final Histogram messageSizeHistogram;
private final Future<?> statusUpdaterFuture;
private final Throttle throttle;
private final String clientId;
private final ThreadSafeShareConsumer consumer;
private final Optional<RecordProcessor> recordProcessor;
private ConsumeMessages(ThreadSafeShareConsumer consumer,
Optional<RecordProcessor> recordProcessor) {
this.latencyHistogram = new Histogram(10000);
this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
this.clientId = consumer.clientId();
this.statusUpdaterFuture = executor.scheduleAtFixedRate(
new ConsumeStatusUpdater(latencyHistogram, messageSizeHistogram, consumer, recordProcessor), 1, 1, TimeUnit.MINUTES);
int perPeriod;
if (spec.targetMessagesPerSec() <= 0)
perPeriod = Integer.MAX_VALUE;
else
perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
this.consumer = consumer;
this.recordProcessor = recordProcessor;
}
ConsumeMessages(ThreadSafeShareConsumer consumer,
Optional<RecordProcessor> recordProcessor,
Set<String> topics) {
this(consumer, recordProcessor);
log.info("Will consume from topics {}.", topics);
this.consumer.subscribe(topics);
}
@Override
public Void call() throws Exception {
long messagesConsumed = 0;
long bytesConsumed = 0;
long startTimeMs = Time.SYSTEM.milliseconds();
long startBatchMs = startTimeMs;
long maxMessages = spec.maxMessages();
try {
while (messagesConsumed < maxMessages) {
ConsumerRecords<byte[], byte[]> records = consumer.poll();
if (records.isEmpty()) {
continue;
}
long endBatchMs = Time.SYSTEM.milliseconds();
long elapsedBatchMs = endBatchMs - startBatchMs;
// Do the record batch processing immediately to avoid latency skew.
recordProcessor.ifPresent(processor -> processor.processRecords(records));
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesConsumed++;
long messageBytes = 0;
if (record.key() != null) {
messageBytes += record.serializedKeySize();
}
if (record.value() != null) {
messageBytes += record.serializedValueSize();
}
latencyHistogram.add(elapsedBatchMs);
messageSizeHistogram.add(messageBytes);
bytesConsumed += messageBytes;
if (messagesConsumed >= maxMessages)
break;
throttle.increment();
}
startBatchMs = Time.SYSTEM.milliseconds();
}
} catch (Exception e) {
WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture);
} finally {
statusUpdaterFuture.cancel(false);
StatusData statusData =
new ConsumeStatusUpdater(latencyHistogram, messageSizeHistogram, consumer, spec.recordProcessor()).update();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("{} Consumed total number of messages={}, bytes={} in {} ms. status: {}",
clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData);
}
consumer.close();
return null;
}
}
public class CloseStatusUpdater implements Runnable {
private final List<Future<Void>> consumeTasks;
CloseStatusUpdater(List<Future<Void>> consumeTasks) {
this.consumeTasks = consumeTasks;
}
@Override
public void run() {
while (!consumeTasks.stream().allMatch(Future::isDone)) {
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
log.debug("{} was interrupted. Closing...", this.getClass().getName());
break; // close the thread
}
}
statusUpdaterFuture.cancel(false);
statusUpdater.update();
doneFuture.complete("");
}
}
class StatusUpdater implements Runnable {
final Map<String, JsonNode> statuses;
StatusUpdater() {
statuses = new HashMap<>();
}
@Override
public void run() {
try {
update();
} catch (Exception e) {
WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
}
}
synchronized void update() {
workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(statuses));
}
synchronized void updateConsumeStatus(String clientId, StatusData status) {
statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree(status));
}
}
/**
* Runnable class that updates the status of a single consumer
*/
public class ConsumeStatusUpdater implements Runnable {
private final Histogram latencyHistogram;
private final Histogram messageSizeHistogram;
private final ThreadSafeShareConsumer consumer;
private final Optional<RecordProcessor> recordProcessor;
ConsumeStatusUpdater(Histogram latencyHistogram,
Histogram messageSizeHistogram,
ThreadSafeShareConsumer consumer,
Optional<RecordProcessor> recordProcessor) {
this.latencyHistogram = latencyHistogram;
this.messageSizeHistogram = messageSizeHistogram;
this.consumer = consumer;
this.recordProcessor = recordProcessor;
}
@Override
public void run() {
try {
update();
} catch (Exception e) {
WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
}
}
StatusData update() {
Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES);
Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES);
// Parse out the RecordProcessor's status, id specified.
Optional<JsonNode> recordProcessorStatus = Optional.empty();
if (recordProcessor.isPresent()) {
recordProcessorStatus = Optional.of(recordProcessor.get().processorStatus());
}
StatusData statusData = new StatusData(
consumer.subscription(),
latSummary.numSamples(),
(long) (msgSummary.numSamples() * msgSummary.average()),
(long) msgSummary.average(),
latSummary.average(),
latSummary.percentiles().get(0).value(),
latSummary.percentiles().get(1).value(),
latSummary.percentiles().get(2).value(),
recordProcessorStatus);
statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
log.info("Status={}", JsonUtil.toJsonString(statusData));
return statusData;
}
}
public static class StatusData {
private final long totalMessagesReceived;
private final Set<String> subscription;
private final long totalBytesReceived;
private final long averageMessageSizeBytes;
private final float averageLatencyMs;
private final int p50LatencyMs;
private final int p95LatencyMs;
private final int p99LatencyMs;
private final Optional<JsonNode> recordProcessorStatus;
/**
* The percentiles to use when calculating the histogram data.
* These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
*/
static final float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
@JsonCreator
StatusData(@JsonProperty("subscription") Set<String> subscription,
@JsonProperty("totalMessagesReceived") long totalMessagesReceived,
@JsonProperty("totalBytesReceived") long totalBytesReceived,
@JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes,
@JsonProperty("averageLatencyMs") float averageLatencyMs,
@JsonProperty("p50LatencyMs") int p50latencyMs,
@JsonProperty("p95LatencyMs") int p95latencyMs,
@JsonProperty("p99LatencyMs") int p99latencyMs,
@JsonProperty("recordProcessorStatus") Optional<JsonNode> recordProcessorStatus) {
this.subscription = subscription;
this.totalMessagesReceived = totalMessagesReceived;
this.totalBytesReceived = totalBytesReceived;
this.averageMessageSizeBytes = averageMessageSizeBytes;
this.averageLatencyMs = averageLatencyMs;
this.p50LatencyMs = p50latencyMs;
this.p95LatencyMs = p95latencyMs;
this.p99LatencyMs = p99latencyMs;
this.recordProcessorStatus = recordProcessorStatus;
}
@JsonProperty
public Set<String> subscription() {
return subscription;
}
@JsonProperty
public long totalMessagesReceived() {
return totalMessagesReceived;
}
@JsonProperty
public long totalBytesReceived() {
return totalBytesReceived;
}
@JsonProperty
public long averageMessageSizeBytes() {
return averageMessageSizeBytes;
}
@JsonProperty
public float averageLatencyMs() {
return averageLatencyMs;
}
@JsonProperty
public int p50LatencyMs() {
return p50LatencyMs;
}
@JsonProperty
public int p95LatencyMs() {
return p95LatencyMs;
}
@JsonProperty
public int p99LatencyMs() {
return p99LatencyMs;
}
@JsonProperty
public JsonNode recordProcessorStatus() {
return recordProcessorStatus.orElse(null);
}
}
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {
throw new IllegalStateException("ShareConsumeBenchWorker is not running.");
}
log.info("{}: Deactivating ShareConsumeBenchWorker.", id);
doneFuture.complete("");
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.DAYS);
this.executor = null;
this.statusUpdater = null;
this.statusUpdaterFuture = null;
this.workerStatus = null;
this.doneFuture = null;
}
/**
* A thread-safe KafkaShareConsumer wrapper
*/
private static class ThreadSafeShareConsumer {
private final KafkaShareConsumer<byte[], byte[]> consumer;
private final String clientId;
private final ReentrantLock consumerLock;
private boolean closed = false;
ThreadSafeShareConsumer(KafkaShareConsumer<byte[], byte[]> consumer, String clientId) {
this.consumer = consumer;
this.clientId = clientId;
this.consumerLock = new ReentrantLock();
}
ConsumerRecords<byte[], byte[]> poll() {
this.consumerLock.lock();
try {
return consumer.poll(Duration.ofMillis(50));
} finally {
this.consumerLock.unlock();
}
}
void close() {
if (closed)
return;
this.consumerLock.lock();
try {
consumer.unsubscribe();
Utils.closeQuietly(consumer, "consumer");
closed = true;
} finally {
this.consumerLock.unlock();
}
}
void subscribe(Set<String> topics) {
this.consumerLock.lock();
try {
consumer.subscribe(topics);
} finally {
this.consumerLock.unlock();
}
}
Set<String> subscription() {
this.consumerLock.lock();
try {
return consumer.subscription();
} finally {
this.consumerLock.unlock();
}
}
String clientId() {
return clientId;
}
KafkaShareConsumer<byte[], byte[]> consumer() {
return consumer;
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class ShareConsumeBenchSpecTest {
@Test
public void testExpandTopicNames() {
ShareConsumeBenchSpec shareConsumeBenchSpec = shareConsumeBenchSpec(Arrays.asList("foo[1-3]", "bar"));
Set<String> expectedNames = new HashSet<>();
expectedNames.add("foo1");
expectedNames.add("foo2");
expectedNames.add("foo3");
expectedNames.add("bar");
assertEquals(expectedNames, shareConsumeBenchSpec.expandTopicNames());
}
@Test
public void testInvalidNameRaisesException() {
for (String invalidName : Arrays.asList("In:valid", "invalid:", ":invalid[]", "in:valid:", "invalid[1-3]:")) {
assertThrows(IllegalArgumentException.class, () -> shareConsumeBenchSpec(Collections.singletonList(invalidName)).expandTopicNames());
}
}
private ShareConsumeBenchSpec shareConsumeBenchSpec(List<String> activeTopics) {
return new ShareConsumeBenchSpec(0, 0, "node", "localhost",
123, 1234, "sg-1",
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 1,
Optional.empty(), activeTopics);
}
}