KAFKA-17593; [8/N] Resolve regular expressions (#17864)

This patch introduces the asynchronous resolution of regular expressions. Let me unpack a few details about the implementations:
1) I have decided to finally update all the regular expressions within a consumer group together. My assumption is that the number of regular expressions in a group will be generally small but the number of topics in a cluster is large. Hence grouping has two benefits. Firstly, it allows to go through the list of topics once for all the regular expressions. Secondly, it reduces the number of potential rebalances because all the regular expressions are updated at the same time.
2) An update is triggered when the group is subscribed to at least one regular expressions.
3) An update is triggered when there is no ongoing update.
4) An update is triggered only of the previous one is older than 10s.
5) An update is triggered when the group has unresolved regular expressions.
6) An update is triggered when the metadata image has new topics.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
David Jacot 2024-11-26 17:56:25 +01:00 committed by GitHub
parent 11ce41fb1d
commit 24dd11d693
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1264 additions and 44 deletions

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
public class MockCoordinatorExecutor<T> implements CoordinatorExecutor<T> {
private class ExecutorTask<R> {
public final String key;
public final TaskRunnable<R> task;
public final TaskOperation<T, R> operation;
ExecutorTask(
String key,
TaskRunnable<R> task,
TaskOperation<T, R> operation
) {
this.key = Objects.requireNonNull(key);
this.task = Objects.requireNonNull(task);
this.operation = Objects.requireNonNull(operation);
}
CoordinatorResult<Void, T> execute() {
try {
return operation.onComplete(task.run(), null);
} catch (Throwable ex) {
return operation.onComplete(null, ex);
}
}
}
public static class ExecutorResult<T> {
public final String key;
public final CoordinatorResult<Void, T> result;
public ExecutorResult(
String key,
CoordinatorResult<Void, T> result
) {
this.key = Objects.requireNonNull(key);
this.result = Objects.requireNonNull(result);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutorResult<?> that = (ExecutorResult<?>) o;
if (!Objects.equals(key, that.key)) return false;
return Objects.equals(result, that.result);
}
@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + this.result.hashCode();
return result;
}
@Override
public String toString() {
return "ExecutorResult(" +
"key='" + key + '\'' +
", result=" + result +
')';
}
}
private final Map<String, TaskRunnable<?>> tasks = new HashMap<>();
private final Queue<ExecutorTask<?>> queue = new ArrayDeque<>();
@Override
public <R> boolean schedule(
String key,
TaskRunnable<R> task,
TaskOperation<T, R> operation
) {
if (tasks.putIfAbsent(key, task) != null) return false;
return queue.add(new ExecutorTask<>(key, task, operation));
}
@Override
public boolean isScheduled(String key) {
return tasks.containsKey(key);
}
public int size() {
return queue.size();
}
@Override
public void cancel(String key) {
tasks.remove(key);
}
public List<ExecutorResult<T>> poll() {
List<ExecutorResult<T>> results = new ArrayList<>();
for (ExecutorTask<?> task : queue) {
tasks.remove(task.key, task.task);
results.add(new ExecutorResult<>(task.key, task.execute()));
}
queue.clear();
return results;
}
}

View File

@ -181,13 +181,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicRegex("foo")
.setSubscribedTopicRegex("foo*")
.setTopicPartitions(List.empty.asJava),
true
).build()
@ -204,6 +204,40 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
}

View File

@ -228,6 +228,7 @@ public class GroupCoordinatorConfig {
.define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
.define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC);
public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
.define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
@ -238,6 +239,7 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef()
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)

View File

@ -217,6 +217,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withExecutor(executor)
.withGroupConfigManager(groupConfigManager)
.withConsumerGroupAssignors(config.consumerGroupAssignors())
.withConsumerGroupMaxSize(config.consumerGroupMaxSize())

View File

@ -62,6 +62,7 @@ import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
@ -118,6 +119,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
@ -134,6 +136,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -199,6 +202,7 @@ public class GroupMetadataManager {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
private CoordinatorExecutor<CoordinatorRecord> executor = null;
private List<ConsumerGroupPartitionAssignor> consumerGroupAssignors = null;
private GroupConfigManager groupConfigManager = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
@ -239,6 +243,11 @@ public class GroupMetadataManager {
return this;
}
Builder withExecutor(CoordinatorExecutor<CoordinatorRecord> executor) {
this.executor = executor;
return this;
}
Builder withConsumerGroupAssignors(List<ConsumerGroupPartitionAssignor> consumerGroupAssignors) {
this.consumerGroupAssignors = consumerGroupAssignors;
return this;
@ -342,6 +351,8 @@ public class GroupMetadataManager {
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (executor == null)
throw new IllegalArgumentException("Executor must be set.");
if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty())
throw new IllegalArgumentException("Assignors must be set before building.");
if (shareGroupAssignor == null)
@ -356,6 +367,7 @@ public class GroupMetadataManager {
logContext,
time,
timer,
executor,
metrics,
consumerGroupAssignors,
metadataImage,
@ -379,6 +391,12 @@ public class GroupMetadataManager {
}
}
/**
* The minimum amount of time between two consecutive refreshes of
* the regular expressions within a single group.
*/
private static final long REGEX_BATCH_REFRESH_INTERVAL_MS = 10_000L;
/**
* The log context.
*/
@ -404,6 +422,11 @@ public class GroupMetadataManager {
*/
private final CoordinatorTimer<Void, CoordinatorRecord> timer;
/**
* The executor to executor asynchronous tasks.
*/
private final CoordinatorExecutor<CoordinatorRecord> executor;
/**
* The coordinator metrics.
*/
@ -459,6 +482,12 @@ public class GroupMetadataManager {
*/
private MetadataImage metadataImage;
/**
* This tracks the version (or the offset) of the last metadata image
* with newly created topics.
*/
private long lastMetadataImageWithNewTopics = -1L;
/**
* An empty result returned to the state machine. This means that
* there are no records to append to the log.
@ -528,6 +557,7 @@ public class GroupMetadataManager {
LogContext logContext,
Time time,
CoordinatorTimer<Void, CoordinatorRecord> timer,
CoordinatorExecutor<CoordinatorRecord> executor,
GroupCoordinatorMetricsShard metrics,
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
MetadataImage metadataImage,
@ -553,6 +583,7 @@ public class GroupMetadataManager {
this.snapshotRegistry = snapshotRegistry;
this.time = time;
this.timer = timer;
this.executor = executor;
this.metrics = metrics;
this.metadataImage = metadataImage;
this.consumerGroupAssignors = consumerGroupAssignors.stream().collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity()));
@ -1812,23 +1843,36 @@ public class GroupMetadataManager {
.setClassicMemberMetadata(null)
.build();
boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
// If the group is newly created, we must ensure that it moves away from
// epoch 0 and that it is fully initialized.
boolean bumpGroupEpoch = group.groupEpoch() == 0;
bumpGroupEpoch |= hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
bumpGroupEpoch |= maybeUpdateRegularExpressions(
group,
member,
updatedMember,
records
);
int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
Map<String, Integer> subscribedTopicNamesMap = group.subscribedTopicNames();
SubscriptionType subscriptionType = group.subscriptionType();
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
Map<String, Integer> subscribedTopicNamesMap = group.computeSubscribedTopicNames(
member,
updatedMember
);
subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNamesMap,
metadataImage.topics(),
@ -2407,15 +2451,14 @@ public class GroupMetadataManager {
/**
* Creates the member subscription record if the updatedMember is different from
* the old member. Returns true if the subscribedTopicNames/subscribedTopicRegex
* has changed.
* the old member. Returns true if the subscribedTopicNames has changed.
*
* @param groupId The group id.
* @param member The old member.
* @param updatedMember The updated member.
* @param records The list to accumulate any new records.
* @return A boolean indicating whether the updatedMember has a different
* subscribedTopicNames/subscribedTopicRegex from the old member.
* subscribedTopicNames from the old member.
* @throws InvalidRegularExpression if the regular expression is invalid.
*/
private boolean hasMemberSubscriptionChanged(
@ -2427,27 +2470,283 @@ public class GroupMetadataManager {
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newConsumerGroupMemberSubscriptionRecord(groupId, updatedMember));
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.debug("[GroupId {}] Member {} updated its subscribed topics to: {}.",
groupId, memberId, updatedMember.subscribedTopicNames());
return true;
}
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, updatedMember.subscribedTopicRegex());
// If the regular expression has changed, we compile it to ensure that
// its syntax is valid.
if (updatedMember.subscribedTopicRegex() != null) {
throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
}
return true;
}
}
return false;
}
private static boolean isNotEmpty(String value) {
return value != null && !value.isEmpty();
}
/**
* Check whether the member has updated its subscribed topic regular expression and
* may trigger the resolution/the refresh of all the regular expressions in the
* group. We align the refreshment of the regular expression in order to have
* them trigger only one rebalance per update.
*
* @param group The consumer group.
* @param member The old member.
* @param updatedMember The new member.
* @param records The records accumulator.
* @return Whether a rebalance must be triggered.
*/
private boolean maybeUpdateRegularExpressions(
ConsumerGroup group,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
List<CoordinatorRecord> records
) {
String groupId = group.groupId();
String memberId = updatedMember.memberId();
String oldSubscribedTopicRegex = member.subscribedTopicRegex();
String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
boolean bumpGroupEpoch = false;
boolean requireRefresh = false;
// Check whether the member has changed its subscribed regex.
if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex)) {
log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, newSubscribedTopicRegex);
if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
// If the member was the last one subscribed to the regex, we delete the
// resolved regular expression.
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
groupId,
oldSubscribedTopicRegex
));
}
if (isNotEmpty(newSubscribedTopicRegex)) {
if (group.numSubscribedMembers(newSubscribedTopicRegex) == 0) {
// If the member subscribed to a new regex, we compile it to ensure its validity.
// We also trigger a refresh of the regexes in order to resolve it.
throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
requireRefresh = true;
} else {
// If the new regex is already resolved, we trigger a rebalance
// by bumping the group epoch.
bumpGroupEpoch = group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
}
}
}
// Conditions to trigger a refresh:
// 0. The group is subscribed to regular expressions.
// 1. There is no ongoing refresh for the group.
// 2. The last refresh is older than 10s.
// 3. The group has unresolved regular expressions.
// 4. The metadata image has new topics.
// 0. The group is subscribed to regular expressions. We also take the one
// that the current may have just introduced.
if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) {
return bumpGroupEpoch;
}
// 1. There is no ongoing refresh for the group.
String key = group.groupId() + "-regex";
if (executor.isScheduled(key)) {
return bumpGroupEpoch;
}
// 2. The last refresh is older than 10s. If the group does not have any regular
// expressions but the current member just brought a new one, we should continue.
long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs();
if (time.milliseconds() <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_INTERVAL_MS) {
return bumpGroupEpoch;
}
// 3. The group has unresolved regular expressions.
Map<String, Integer> subscribedRegularExpressions = new HashMap<>(group.subscribedRegularExpressions());
if (isNotEmpty(oldSubscribedTopicRegex)) {
subscribedRegularExpressions.compute(oldSubscribedTopicRegex, Utils::decValue);
}
if (isNotEmpty(newSubscribedTopicRegex)) {
subscribedRegularExpressions.compute(newSubscribedTopicRegex, Utils::incValue);
}
requireRefresh |= subscribedRegularExpressions.size() != group.numResolvedRegularExpressions();
// 4. The metadata has new topics that we must consider.
requireRefresh |= group.lastResolvedRegularExpressionVersion() < lastMetadataImageWithNewTopics;
if (requireRefresh && !subscribedRegularExpressions.isEmpty()) {
Set<String> regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(
key,
() -> refreshRegularExpressions(groupId, log, time, metadataImage, regexes),
(result, exception) -> handleRegularExpressionsResult(groupId, result, exception)
);
}
return bumpGroupEpoch;
}
/**
* Resolves the provided regular expressions. Note that this static method is executed
* as an asynchronous task in the executor. Hence, it should not access any state from
* the manager.
*
* @param groupId The group id.
* @param log The log instance.
* @param time The time instance.
* @param image The metadata image to use for listing the topics.
* @param regexes The list of regular expressions that must be resolved.
* @return The list of resolved regular expressions.
*
* public for benchmarks.
*/
public static Map<String, ResolvedRegularExpression> refreshRegularExpressions(
String groupId,
Logger log,
Time time,
MetadataImage image,
Set<String> regexes
) {
long startTimeMs = time.milliseconds();
log.debug("[GroupId {}] Refreshing regular expressions: {}", groupId, regexes);
Map<String, Set<String>> resolvedRegexes = new HashMap<>(regexes.size());
List<Pattern> compiledRegexes = new ArrayList<>(regexes.size());
for (String regex : regexes) {
resolvedRegexes.put(regex, new HashSet<>());
try {
compiledRegexes.add(Pattern.compile(regex));
} catch (PatternSyntaxException ex) {
// This should not happen because the regular expressions are validated
// when received from the members. If for some reason, it would
// happen, we log it and ignore it.
log.error("[GroupId {}] Couldn't parse regular expression '{}' due to `{}`. Ignoring it.",
groupId, regex, ex.getDescription());
}
}
for (String topicName : image.topics().topicsByName().keySet()) {
for (Pattern regex : compiledRegexes) {
if (regex.matcher(topicName).matches()) {
resolvedRegexes.get(regex.pattern()).add(topicName);
}
}
}
long version = image.provenance().lastContainedOffset();
Map<String, ResolvedRegularExpression> result = new HashMap<>(resolvedRegexes.size());
for (Map.Entry<String, Set<String>> resolvedRegex : resolvedRegexes.entrySet()) {
result.put(
resolvedRegex.getKey(),
new ResolvedRegularExpression(resolvedRegex.getValue(), version, startTimeMs)
);
}
log.info("[GroupId {}] Scanned {} topics to refresh regular expressions {} in {}ms.",
groupId, image.topics().topicsByName().size(), resolvedRegexes.keySet(),
time.milliseconds() - startTimeMs);
return result;
}
/**
* Handle the result of the asynchronous tasks which resolves the regular expressions.
*
* @param resolvedRegularExpressions The resolved regular expressions.
* @param exception The exception if the resolution failed.
* @return A CoordinatorResult containing the records to mutate the group state.
*/
private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResult(
String groupId,
Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
Throwable exception
) {
if (exception != null) {
log.error("[GroupId {}] Couldn't update regular expression due to: {}",
groupId, exception.getMessage());
return new CoordinatorResult<>(Collections.emptyList());
}
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Received updated regular expressions: {}.",
groupId, resolvedRegularExpressions);
}
List<CoordinatorRecord> records = new ArrayList<>();
try {
ConsumerGroup group = consumerGroup(groupId);
Map<String, Integer> subscribedTopicNames = new HashMap<>(group.subscribedTopicNames());
boolean bumpGroupEpoch = false;
for (Map.Entry<String, ResolvedRegularExpression> entry : resolvedRegularExpressions.entrySet()) {
String regex = entry.getKey();
// We can skip the regex if the group is no longer
// subscribed to it.
if (group.numSubscribedMembers(regex) == 0) continue;
ResolvedRegularExpression newResolvedRegularExpression = entry.getValue();
ResolvedRegularExpression oldResolvedRegularExpression = group
.resolvedRegularExpression(regex)
.orElse(ResolvedRegularExpression.EMPTY);
if (!oldResolvedRegularExpression.topics.equals(newResolvedRegularExpression.topics)) {
bumpGroupEpoch = true;
oldResolvedRegularExpression.topics.forEach(topicName ->
subscribedTopicNames.compute(topicName, Utils::decValue)
);
newResolvedRegularExpression.topics.forEach(topicName ->
subscribedTopicNames.compute(topicName, Utils::incValue)
);
}
// Add the record to persist the change.
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
regex,
newResolvedRegularExpression
));
}
// Compute the subscription metadata.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNames,
metadataImage.topics(),
metadataImage.cluster()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
}
bumpGroupEpoch = true;
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
if (bumpGroupEpoch) {
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(
time.milliseconds() + consumerGroupMetadataRefreshIntervalMs,
groupEpoch
);
}
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Received result of regular expression resolution but " +
"it no longer exists.", groupId);
}
return new CoordinatorResult<>(records);
}
/**
* Creates the member subscription record if the updatedMember is different from
* the old member. Returns true if the subscribedTopicNames has changed.
@ -3835,24 +4134,39 @@ public class GroupMetadataManager {
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
metadataImage = newImage;
// Initialize the last offset if it was not yet.
if (lastMetadataImageWithNewTopics == -1L) {
lastMetadataImageWithNewTopics = metadataImage.provenance().lastContainedOffset();
}
TopicsDelta topicsDelta = delta.topicsDelta();
if (topicsDelta == null) return;
// Updated the last offset of the image with newly created topics. This is used to
// trigger a refresh of all the regular expressions when topics are created. Note
// that we don't trigger a refresh when topics are deleted. Those are removed from
// the subscription metadata (and the assignment) via the above mechanism. The
// resolved regular expressions are cleaned up on the next refresh.
if (!topicsDelta.createdTopicIds().isEmpty()) {
lastMetadataImageWithNewTopics = metadataImage.provenance().lastContainedOffset();
}
// Notify all the groups subscribed to the created, updated or
// deleted topics.
Optional.ofNullable(delta.topicsDelta()).ifPresent(topicsDelta -> {
Set<String> allGroupIds = new HashSet<>();
topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
String topicName = topicDelta.name();
allGroupIds.addAll(groupsSubscribedToTopic(topicName));
});
topicsDelta.deletedTopicIds().forEach(topicId -> {
TopicImage topicImage = delta.image().topics().getTopic(topicId);
allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
});
allGroupIds.forEach(groupId -> {
Group group = groups.get(groupId);
if (group != null && (group.type() == CONSUMER || group.type() == SHARE)) {
((ModernGroup<?>) group).requestMetadataRefresh();
}
});
Set<String> allGroupIds = new HashSet<>();
topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
String topicName = topicDelta.name();
allGroupIds.addAll(groupsSubscribedToTopic(topicName));
});
topicsDelta.deletedTopicIds().forEach(topicId -> {
TopicImage topicImage = delta.image().topics().getTopic(topicId);
allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
});
allGroupIds.forEach(groupId -> {
Group group = groups.get(groupId);
if (group != null && (group.type() == CONSUMER || group.type() == SHARE)) {
((ModernGroup<?>) group).requestMetadataRefresh();
}
});
}

View File

@ -48,6 +48,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -379,6 +380,32 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
}
}
/**
* @return The last time resolved regular expressions were refreshed or Long.MIN_VALUE if
* there are no resolved regular expression. Note that we use the timestamp of the first
* entry as a proxy for all of them. They are always resolved together.
*/
public long lastResolvedRegularExpressionRefreshTimeMs() {
Iterator<ResolvedRegularExpression> iterator = resolvedRegularExpressions.values().iterator();
if (iterator.hasNext()) {
return iterator.next().timestamp;
} else {
return Long.MIN_VALUE;
}
}
/**
* @return The version of the regular expressions or Zero if there are no resolved regular expression.
*/
public long lastResolvedRegularExpressionVersion() {
Iterator<ResolvedRegularExpression> iterator = resolvedRegularExpressions.values().iterator();
if (iterator.hasNext()) {
return iterator.next().version;
} else {
return 0L;
}
}
/**
* Return an optional containing the resolved regular expression corresponding to the provided regex
* or an empty optional.
@ -386,10 +413,17 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
* @param regex The regular expression.
* @return The optional containing the resolved regular expression or an empty optional.
*/
public Optional<ResolvedRegularExpression> regularExpression(String regex) {
public Optional<ResolvedRegularExpression> resolvedRegularExpression(String regex) {
return Optional.ofNullable(resolvedRegularExpressions.get(regex));
}
/**
* @return The number of resolved regular expressions.
*/
public int numResolvedRegularExpressions() {
return resolvedRegularExpressions.size();
}
/**
* @return The number of members subscribed to the provided regex.
*/
@ -397,6 +431,14 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
return subscribedRegularExpressions.getOrDefault(regex, 0);
}
/**
* @return An immutable map containing all the subscribed regular expressions
* with the subscribers counts.
*/
public Map<String, Integer> subscribedRegularExpressions() {
return Collections.unmodifiableMap(subscribedRegularExpressions);
}
/**
* @return The number of members that use the classic protocol.
*/
@ -747,11 +789,11 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
ConsumerGroupMember newMember
) {
// Decrement the count of the old regex.
if (oldMember != null && oldMember.subscribedTopicRegex() != null) {
if (oldMember != null && oldMember.subscribedTopicRegex() != null && !oldMember.subscribedTopicRegex().isEmpty()) {
subscribedRegularExpressions.compute(oldMember.subscribedTopicRegex(), Utils::decValue);
}
// Increment the count of the new regex.
if (newMember != null && newMember.subscribedTopicRegex() != null) {
if (newMember != null && newMember.subscribedTopicRegex() != null && !newMember.subscribedTopicRegex().isEmpty()) {
subscribedRegularExpressions.compute(newMember.subscribedTopicRegex(), Utils::incValue);
}
}

View File

@ -24,6 +24,8 @@ import java.util.Set;
* The metadata associated with a regular expression in a Consumer Group.
*/
public class ResolvedRegularExpression {
public static final ResolvedRegularExpression EMPTY = new ResolvedRegularExpression(Collections.emptySet(), -1L, -1L);
/**
* The set of resolved topics.
*/

View File

@ -66,6 +66,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredTimeout;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
@ -15063,7 +15064,7 @@ public class GroupMetadataManagerTest {
assertEquals(
Optional.of(resolvedRegularExpression),
context.groupMetadataManager.consumerGroup("foo").regularExpression("abc*")
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
);
}
@ -15097,7 +15098,7 @@ public class GroupMetadataManagerTest {
assertEquals(
Optional.empty(),
context.groupMetadataManager.consumerGroup("foo").regularExpression("abc*")
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
);
}
@ -15178,6 +15179,526 @@ public class GroupMetadataManagerTest {
);
}
@Test
public void testConsumerGroupMemberJoinsWithNewRegex() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build(12345L))
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(List.of("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.build())
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.withAssignmentEpoch(10))
.build();
// Member 2 joins the consumer group with a new regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()),
result.response()
);
ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.build();
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
);
assertRecordsEquals(expectedRecords, result.records());
// Execute pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks();
assertEquals(
List.of(
new MockCoordinatorExecutor.ExecutorResult<>(
groupId + "-regex",
new CoordinatorResult<>(List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*",
new ResolvedRegularExpression(
Set.of("foo"),
12345L,
context.time.milliseconds()
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
))
)
),
tasks
);
}
@Test
public void testConsumerGroupMemberJoinsWithUpdatedRegex() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build(12345L))
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.build())
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.withAssignmentEpoch(10))
.build();
// Member 1 updates its new regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000),
result.response()
);
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.build();
List<CoordinatorRecord> expectedRecords = List.of(
// The member subscription is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The previous regular expression is deleted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
);
assertRecordsEquals(expectedRecords, result.records());
// Execute pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks();
assertEquals(1, tasks.size());
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*|bar*",
new ResolvedRegularExpression(
Set.of("foo", "bar"),
12345L,
context.time.milliseconds()
)
),
// The updated subscription metadata.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
groupId,
Map.of(
"foo", new TopicMetadata(fooTopicId, fooTopicName, 6),
"bar", new TopicMetadata(barTopicId, barTopicName, 3)
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
),
task.result.records()
);
}
@Test
public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCompleted() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build(12345L))
.build();
// Member 1 joins.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()),
result.response()
);
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.build();
List<CoordinatorRecord> expectedRecords = List.of(
// The member subscription is created.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
// The target assignment is created.
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Collections.emptyMap()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
// The member current state is created.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1)
);
assertRecordsEquals(expectedRecords, result.records());
// The task is scheduled.
assertTrue(context.executor.isScheduled(groupId + "-regex"));
// The member updates its regex before the resolution of the previous one completes.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000),
result.response()
);
expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.build();
expectedRecords = List.of(
// The member subscription is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The previous regex is deleted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
);
assertRecordsEquals(expectedRecords, result.records());
// The task is still scheduled.
assertTrue(context.executor.isScheduled(groupId + "-regex"));
assertEquals(1, context.executor.size());
// Execute the pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks();
assertEquals(1, tasks.size());
// The pending task was a no-op.
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertRecordsEquals(Collections.emptyList(), task.result.records());
// The member heartbeats again. It triggers a new resolution.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000),
result.response()
);
assertTrue(context.executor.isScheduled(groupId + "-regex"));
assertEquals(1, context.executor.size());
// Execute pending tasks.
tasks = context.processTasks();
assertEquals(1, tasks.size());
task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*|bar*",
new ResolvedRegularExpression(
Set.of("foo", "bar"),
12345L,
context.time.milliseconds()
)
),
// The updated subscription metadata.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
groupId,
Map.of(
"foo", new TopicMetadata(fooTopicId, fooTopicName, 6),
"bar", new TopicMetadata(barTopicId, barTopicName, 3)
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
),
task.result.records()
);
}
@Test
public void testConsumerGroupMemberJoinRefreshesExpiredRegexes() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
Uuid foooTopicId = Uuid.randomUuid();
String foooTopicName = "fooo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap()));
MetadataImage image = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build(1L);
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(image)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.build())
.withMember(new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(barTopicId, 0, 1, 2)))
.build())
.withResolvedRegularExpression("foo*", new ResolvedRegularExpression(
Set.of(fooTopicName), 0L, 0L))
.withResolvedRegularExpression("bar*", new ResolvedRegularExpression(
Set.of(barTopicName), 0L, 0L))
.withSubscriptionMetadata(Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)))
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(barTopicId, 0, 1, 2)))
.withAssignmentEpoch(10))
.build();
// Update metadata image.
MetadataImage newImage = new MetadataImageBuilder(image)
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addTopic(foooTopicId, foooTopicName, 1)
.build(2L);
context.groupMetadataManager.onNewMetadataImage(
newImage,
new MetadataDelta(newImage)
);
// A member heartbeats.
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(10));
// The task is NOT scheduled.
assertFalse(context.executor.isScheduled(groupId + "-regex"));
// Advance past the batching interval.
context.sleep(11000L);
// A member heartbeats.
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(10));
// The task is scheduled.
assertTrue(context.executor.isScheduled(groupId + "-regex"));
// Execute the pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks();
assertEquals(1, tasks.size());
// Execute pending tasks.
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*",
new ResolvedRegularExpression(
Set.of(fooTopicName, foooTopicName),
2L,
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"bar*",
new ResolvedRegularExpression(
Set.of(barTopicName),
2L,
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
groupId,
Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3),
foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1)
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
);
assertUnorderedListEquals(expectedRecords.subList(0, 2), task.result.records().subList(0, 2));
assertRecordsEquals(expectedRecords.subList(2, 4), task.result.records().subList(2, 4));
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,

View File

@ -51,6 +51,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
@ -398,8 +399,9 @@ public class GroupMetadataManagerTestContext {
}
public static class Builder {
private final MockTime time = new MockTime();
private final MockTime time = new MockTime(0, 0, 0);
private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = new MockCoordinatorTimer<>(time);
private final MockCoordinatorExecutor<CoordinatorRecord> executor = new MockCoordinatorExecutor<>();
private final LogContext logContext = new LogContext();
private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
private MetadataImage metadataImage;
@ -493,6 +495,7 @@ public class GroupMetadataManagerTestContext {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
time,
timer,
executor,
snapshotRegistry,
metrics,
new GroupMetadataManager.Builder()
@ -500,6 +503,7 @@ public class GroupMetadataManagerTestContext {
.withLogContext(logContext)
.withTime(time)
.withTimer(timer)
.withExecutor(executor)
.withMetadataImage(metadataImage)
.withConsumerGroupHeartbeatInterval(5000)
.withConsumerGroupSessionTimeout(45000)
@ -533,6 +537,7 @@ public class GroupMetadataManagerTestContext {
final MockTime time;
final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
final MockCoordinatorExecutor<CoordinatorRecord> executor;
final SnapshotRegistry snapshotRegistry;
final GroupCoordinatorMetricsShard metrics;
final GroupMetadataManager groupMetadataManager;
@ -546,6 +551,7 @@ public class GroupMetadataManagerTestContext {
public GroupMetadataManagerTestContext(
MockTime time,
MockCoordinatorTimer<Void, CoordinatorRecord> timer,
MockCoordinatorExecutor<CoordinatorRecord> executor,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
GroupMetadataManager groupMetadataManager,
@ -555,6 +561,7 @@ public class GroupMetadataManagerTestContext {
) {
this.time = time;
this.timer = timer;
this.executor = executor;
this.snapshotRegistry = snapshotRegistry;
this.metrics = metrics;
this.groupMetadataManager = groupMetadataManager;
@ -677,6 +684,16 @@ public class GroupMetadataManagerTestContext {
return timeouts;
}
public List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> processTasks() {
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> results = executor.poll();
results.forEach(taskResult -> {
if (taskResult.result.replayRecords()) {
taskResult.result.records().forEach(this::replay);
}
});
return results;
}
public void assertSessionTimeout(
String groupId,
String memberId,

View File

@ -27,7 +27,15 @@ import org.apache.kafka.image.MetadataProvenance;
import java.util.Arrays;
public class MetadataImageBuilder {
private final MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
private final MetadataDelta delta;
public MetadataImageBuilder() {
this(MetadataImage.EMPTY);
}
public MetadataImageBuilder(MetadataImage image) {
this.delta = new MetadataDelta(image);
}
public MetadataImageBuilder addTopic(
Uuid topicId,
@ -61,6 +69,10 @@ public class MetadataImageBuilder {
}
public MetadataImage build() {
return delta.apply(MetadataProvenance.EMPTY);
return build(0);
}
public MetadataImage build(long version) {
return delta.apply(new MetadataProvenance(version, 0, 0L, true));
}
}

View File

@ -49,6 +49,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
@ -100,6 +101,7 @@ public class OffsetMetadataManagerTest {
public static class Builder {
private final MockTime time = new MockTime();
private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = new MockCoordinatorTimer<>(time);
private final MockCoordinatorExecutor<CoordinatorRecord> executor = new MockCoordinatorExecutor<>();
private final LogContext logContext = new LogContext();
private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
@ -133,6 +135,7 @@ public class OffsetMetadataManagerTest {
groupMetadataManager = new GroupMetadataManager.Builder()
.withTime(time)
.withTimer(timer)
.withExecutor(executor)
.withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext)
.withMetadataImage(metadataImage)

View File

@ -1615,26 +1615,39 @@ public class ConsumerGroupTest {
ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3")
.setSubscribedTopicRegex("regex1")
.build();
ConsumerGroupMember member4 = new ConsumerGroupMember.Builder("member4")
.build();
// Assert the initial state.
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Add member 1.
consumerGroup.updateMember(member1);
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Add member 2.
consumerGroup.updateMember(member2);
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Add member 3.
consumerGroup.updateMember(member3);
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(2, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Add member 4.
consumerGroup.updateMember(member4);
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(2, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
@ -1644,24 +1657,28 @@ public class ConsumerGroupTest {
.setSubscribedTopicRegex("regex2")
.build();
consumerGroup.updateMember(member3);
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Remove member 1.
consumerGroup.removeMember(member1.memberId());
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Remove member 2.
consumerGroup.removeMember(member2.memberId());
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
// Remove member 3.
consumerGroup.removeMember(member3.memberId());
assertEquals(0, consumerGroup.numSubscribedMembers(""));
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.coordinator;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class RegexResolutionBenchmark {
private static final Logger LOG = new LogContext().logger(RegexResolutionBenchmark.class);
private static final Time TIME = Time.SYSTEM;
private static final String GROUP_ID = "my-group-id";
private static final List<String> WORDS = List.of(
"data",
"stream",
"queue",
"analytics",
"service",
"event",
"log",
"cloud",
"process",
"system",
"message",
"broker",
"partition",
"key",
"value",
"cluster",
"zookeeper",
"replication",
"topic",
"producer"
);
@Param({"10000", "100000", "1000000"})
private int topicCount;
@Param({"1", "10", "100"})
private int regexCount;
private MetadataImage image;
private Set<String> regexes;
@Setup(Level.Trial)
public void setup() {
Random random = new Random();
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
for (int i = 0; i < topicCount; i++) {
String topicName =
WORDS.get(random.nextInt(WORDS.size())) + "_" +
WORDS.get(random.nextInt(WORDS.size())) + "_" +
i;
delta.replay(new TopicRecord()
.setTopicId(Uuid.randomUuid())
.setName(topicName));
}
image = delta.apply(MetadataProvenance.EMPTY);
regexes = new HashSet<>();
for (int i = 0; i < regexCount; i++) {
regexes.add(".*" + WORDS.get(random.nextInt(WORDS.size())) + ".*");
}
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void run() {
GroupMetadataManager.refreshRegularExpressions(
GROUP_ID,
LOG,
TIME,
image,
regexes
);
}
}