diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorExecutor.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorExecutor.java new file mode 100644 index 00000000000..40b946bbefd --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorExecutor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; + +public class MockCoordinatorExecutor implements CoordinatorExecutor { + private class ExecutorTask { + public final String key; + public final TaskRunnable task; + public final TaskOperation operation; + + ExecutorTask( + String key, + TaskRunnable task, + TaskOperation operation + ) { + this.key = Objects.requireNonNull(key); + this.task = Objects.requireNonNull(task); + this.operation = Objects.requireNonNull(operation); + } + + CoordinatorResult execute() { + try { + return operation.onComplete(task.run(), null); + } catch (Throwable ex) { + return operation.onComplete(null, ex); + } + } + } + + public static class ExecutorResult { + public final String key; + public final CoordinatorResult result; + + public ExecutorResult( + String key, + CoordinatorResult result + ) { + this.key = Objects.requireNonNull(key); + this.result = Objects.requireNonNull(result); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ExecutorResult that = (ExecutorResult) o; + + if (!Objects.equals(key, that.key)) return false; + return Objects.equals(result, that.result); + } + + @Override + public int hashCode() { + int result = key.hashCode(); + result = 31 * result + this.result.hashCode(); + return result; + } + + @Override + public String toString() { + return "ExecutorResult(" + + "key='" + key + '\'' + + ", result=" + result + + ')'; + } + } + + private final Map> tasks = new HashMap<>(); + private final Queue> queue = new ArrayDeque<>(); + + @Override + public boolean schedule( + String key, + TaskRunnable task, + TaskOperation operation + ) { + if (tasks.putIfAbsent(key, task) != null) return false; + return queue.add(new ExecutorTask<>(key, task, operation)); + } + + @Override + public boolean isScheduled(String key) { + return tasks.containsKey(key); + } + + public int size() { + return queue.size(); + } + + @Override + public void cancel(String key) { + tasks.remove(key); + } + + public List> poll() { + List> results = new ArrayList<>(); + for (ExecutorTask task : queue) { + tasks.remove(task.key, task.task); + results.add(new ExecutorResult<>(task.key, task.execute())); + } + queue.clear(); + return results; + } +} diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 212dad307b3..a4e3de61855 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -181,13 +181,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // Heartbeat request to join the group. Note that the member subscribes // to an nonexistent topic. - val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(Uuid.randomUuid().toString) .setMemberEpoch(0) .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicRegex("foo") + .setSubscribedTopicRegex("foo*") .setTopicPartitions(List.empty.asJava), true ).build() @@ -204,6 +204,40 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertNotNull(consumerGroupHeartbeatResponse.data.memberId) assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) + + // Create the topic. + val topicId = TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Prepare the next heartbeat. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch), + true + ).build() + + // This is the expected assignment. + val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + + // Heartbeats until the partitions are assigned. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") + + // Verify the response. + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) } finally { admin.close() } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 5e7e18efa1f..f94b2cc706b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -228,6 +228,7 @@ public class GroupCoordinatorConfig { .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC) .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC) .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC); + public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef() .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC) .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) @@ -238,6 +239,7 @@ public class GroupCoordinatorConfig { .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC); + public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef() .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC) .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 78f161b3c30..099755710b9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -217,6 +217,7 @@ public class GroupCoordinatorShard implements CoordinatorShard timer = null; + private CoordinatorExecutor executor = null; private List consumerGroupAssignors = null; private GroupConfigManager groupConfigManager = null; private int consumerGroupMaxSize = Integer.MAX_VALUE; @@ -239,6 +243,11 @@ public class GroupMetadataManager { return this; } + Builder withExecutor(CoordinatorExecutor executor) { + this.executor = executor; + return this; + } + Builder withConsumerGroupAssignors(List consumerGroupAssignors) { this.consumerGroupAssignors = consumerGroupAssignors; return this; @@ -342,6 +351,8 @@ public class GroupMetadataManager { if (timer == null) throw new IllegalArgumentException("Timer must be set."); + if (executor == null) + throw new IllegalArgumentException("Executor must be set."); if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty()) throw new IllegalArgumentException("Assignors must be set before building."); if (shareGroupAssignor == null) @@ -356,6 +367,7 @@ public class GroupMetadataManager { logContext, time, timer, + executor, metrics, consumerGroupAssignors, metadataImage, @@ -379,6 +391,12 @@ public class GroupMetadataManager { } } + /** + * The minimum amount of time between two consecutive refreshes of + * the regular expressions within a single group. + */ + private static final long REGEX_BATCH_REFRESH_INTERVAL_MS = 10_000L; + /** * The log context. */ @@ -404,6 +422,11 @@ public class GroupMetadataManager { */ private final CoordinatorTimer timer; + /** + * The executor to executor asynchronous tasks. + */ + private final CoordinatorExecutor executor; + /** * The coordinator metrics. */ @@ -459,6 +482,12 @@ public class GroupMetadataManager { */ private MetadataImage metadataImage; + /** + * This tracks the version (or the offset) of the last metadata image + * with newly created topics. + */ + private long lastMetadataImageWithNewTopics = -1L; + /** * An empty result returned to the state machine. This means that * there are no records to append to the log. @@ -528,6 +557,7 @@ public class GroupMetadataManager { LogContext logContext, Time time, CoordinatorTimer timer, + CoordinatorExecutor executor, GroupCoordinatorMetricsShard metrics, List consumerGroupAssignors, MetadataImage metadataImage, @@ -553,6 +583,7 @@ public class GroupMetadataManager { this.snapshotRegistry = snapshotRegistry; this.time = time; this.timer = timer; + this.executor = executor; this.metrics = metrics; this.metadataImage = metadataImage; this.consumerGroupAssignors = consumerGroupAssignors.stream().collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity())); @@ -1812,23 +1843,36 @@ public class GroupMetadataManager { .setClassicMemberMetadata(null) .build(); - boolean bumpGroupEpoch = hasMemberSubscriptionChanged( + // If the group is newly created, we must ensure that it moves away from + // epoch 0 and that it is fully initialized. + boolean bumpGroupEpoch = group.groupEpoch() == 0; + + bumpGroupEpoch |= hasMemberSubscriptionChanged( groupId, member, updatedMember, records ); + bumpGroupEpoch |= maybeUpdateRegularExpressions( + group, + member, + updatedMember, + records + ); + int groupEpoch = group.groupEpoch(); Map subscriptionMetadata = group.subscriptionMetadata(); - Map subscribedTopicNamesMap = group.subscribedTopicNames(); SubscriptionType subscriptionType = group.subscriptionType(); if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { // The subscription metadata is updated in two cases: // 1) The member has updated its subscriptions; // 2) The refresh deadline has been reached. - subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember); + Map subscribedTopicNamesMap = group.computeSubscribedTopicNames( + member, + updatedMember + ); subscriptionMetadata = group.computeSubscriptionMetadata( subscribedTopicNamesMap, metadataImage.topics(), @@ -2407,15 +2451,14 @@ public class GroupMetadataManager { /** * Creates the member subscription record if the updatedMember is different from - * the old member. Returns true if the subscribedTopicNames/subscribedTopicRegex - * has changed. + * the old member. Returns true if the subscribedTopicNames has changed. * * @param groupId The group id. * @param member The old member. * @param updatedMember The updated member. * @param records The list to accumulate any new records. * @return A boolean indicating whether the updatedMember has a different - * subscribedTopicNames/subscribedTopicRegex from the old member. + * subscribedTopicNames from the old member. * @throws InvalidRegularExpression if the regular expression is invalid. */ private boolean hasMemberSubscriptionChanged( @@ -2427,27 +2470,283 @@ public class GroupMetadataManager { String memberId = updatedMember.memberId(); if (!updatedMember.equals(member)) { records.add(newConsumerGroupMemberSubscriptionRecord(groupId, updatedMember)); - if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { log.debug("[GroupId {}] Member {} updated its subscribed topics to: {}.", groupId, memberId, updatedMember.subscribedTopicNames()); return true; } - - if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { - log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.", - groupId, memberId, updatedMember.subscribedTopicRegex()); - // If the regular expression has changed, we compile it to ensure that - // its syntax is valid. - if (updatedMember.subscribedTopicRegex() != null) { - throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex()); - } - return true; - } } return false; } + private static boolean isNotEmpty(String value) { + return value != null && !value.isEmpty(); + } + + /** + * Check whether the member has updated its subscribed topic regular expression and + * may trigger the resolution/the refresh of all the regular expressions in the + * group. We align the refreshment of the regular expression in order to have + * them trigger only one rebalance per update. + * + * @param group The consumer group. + * @param member The old member. + * @param updatedMember The new member. + * @param records The records accumulator. + * @return Whether a rebalance must be triggered. + */ + private boolean maybeUpdateRegularExpressions( + ConsumerGroup group, + ConsumerGroupMember member, + ConsumerGroupMember updatedMember, + List records + ) { + String groupId = group.groupId(); + String memberId = updatedMember.memberId(); + String oldSubscribedTopicRegex = member.subscribedTopicRegex(); + String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex(); + + boolean bumpGroupEpoch = false; + boolean requireRefresh = false; + + // Check whether the member has changed its subscribed regex. + if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex)) { + log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.", + groupId, memberId, newSubscribedTopicRegex); + + if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) { + // If the member was the last one subscribed to the regex, we delete the + // resolved regular expression. + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone( + groupId, + oldSubscribedTopicRegex + )); + } + + if (isNotEmpty(newSubscribedTopicRegex)) { + if (group.numSubscribedMembers(newSubscribedTopicRegex) == 0) { + // If the member subscribed to a new regex, we compile it to ensure its validity. + // We also trigger a refresh of the regexes in order to resolve it. + throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex()); + requireRefresh = true; + } else { + // If the new regex is already resolved, we trigger a rebalance + // by bumping the group epoch. + bumpGroupEpoch = group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent(); + } + } + } + + // Conditions to trigger a refresh: + // 0. The group is subscribed to regular expressions. + // 1. There is no ongoing refresh for the group. + // 2. The last refresh is older than 10s. + // 3. The group has unresolved regular expressions. + // 4. The metadata image has new topics. + + // 0. The group is subscribed to regular expressions. We also take the one + // that the current may have just introduced. + if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) { + return bumpGroupEpoch; + } + + // 1. There is no ongoing refresh for the group. + String key = group.groupId() + "-regex"; + if (executor.isScheduled(key)) { + return bumpGroupEpoch; + } + + // 2. The last refresh is older than 10s. If the group does not have any regular + // expressions but the current member just brought a new one, we should continue. + long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs(); + if (time.milliseconds() <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_INTERVAL_MS) { + return bumpGroupEpoch; + } + + // 3. The group has unresolved regular expressions. + Map subscribedRegularExpressions = new HashMap<>(group.subscribedRegularExpressions()); + if (isNotEmpty(oldSubscribedTopicRegex)) { + subscribedRegularExpressions.compute(oldSubscribedTopicRegex, Utils::decValue); + } + if (isNotEmpty(newSubscribedTopicRegex)) { + subscribedRegularExpressions.compute(newSubscribedTopicRegex, Utils::incValue); + } + + requireRefresh |= subscribedRegularExpressions.size() != group.numResolvedRegularExpressions(); + + // 4. The metadata has new topics that we must consider. + requireRefresh |= group.lastResolvedRegularExpressionVersion() < lastMetadataImageWithNewTopics; + + if (requireRefresh && !subscribedRegularExpressions.isEmpty()) { + Set regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet()); + executor.schedule( + key, + () -> refreshRegularExpressions(groupId, log, time, metadataImage, regexes), + (result, exception) -> handleRegularExpressionsResult(groupId, result, exception) + ); + } + + return bumpGroupEpoch; + } + + /** + * Resolves the provided regular expressions. Note that this static method is executed + * as an asynchronous task in the executor. Hence, it should not access any state from + * the manager. + * + * @param groupId The group id. + * @param log The log instance. + * @param time The time instance. + * @param image The metadata image to use for listing the topics. + * @param regexes The list of regular expressions that must be resolved. + * @return The list of resolved regular expressions. + * + * public for benchmarks. + */ + public static Map refreshRegularExpressions( + String groupId, + Logger log, + Time time, + MetadataImage image, + Set regexes + ) { + long startTimeMs = time.milliseconds(); + log.debug("[GroupId {}] Refreshing regular expressions: {}", groupId, regexes); + + Map> resolvedRegexes = new HashMap<>(regexes.size()); + List compiledRegexes = new ArrayList<>(regexes.size()); + for (String regex : regexes) { + resolvedRegexes.put(regex, new HashSet<>()); + try { + compiledRegexes.add(Pattern.compile(regex)); + } catch (PatternSyntaxException ex) { + // This should not happen because the regular expressions are validated + // when received from the members. If for some reason, it would + // happen, we log it and ignore it. + log.error("[GroupId {}] Couldn't parse regular expression '{}' due to `{}`. Ignoring it.", + groupId, regex, ex.getDescription()); + } + } + + for (String topicName : image.topics().topicsByName().keySet()) { + for (Pattern regex : compiledRegexes) { + if (regex.matcher(topicName).matches()) { + resolvedRegexes.get(regex.pattern()).add(topicName); + } + } + } + + long version = image.provenance().lastContainedOffset(); + Map result = new HashMap<>(resolvedRegexes.size()); + for (Map.Entry> resolvedRegex : resolvedRegexes.entrySet()) { + result.put( + resolvedRegex.getKey(), + new ResolvedRegularExpression(resolvedRegex.getValue(), version, startTimeMs) + ); + } + + log.info("[GroupId {}] Scanned {} topics to refresh regular expressions {} in {}ms.", + groupId, image.topics().topicsByName().size(), resolvedRegexes.keySet(), + time.milliseconds() - startTimeMs); + + return result; + } + + /** + * Handle the result of the asynchronous tasks which resolves the regular expressions. + * + * @param resolvedRegularExpressions The resolved regular expressions. + * @param exception The exception if the resolution failed. + * @return A CoordinatorResult containing the records to mutate the group state. + */ + private CoordinatorResult handleRegularExpressionsResult( + String groupId, + Map resolvedRegularExpressions, + Throwable exception + ) { + if (exception != null) { + log.error("[GroupId {}] Couldn't update regular expression due to: {}", + groupId, exception.getMessage()); + return new CoordinatorResult<>(Collections.emptyList()); + } + + if (log.isDebugEnabled()) { + log.debug("[GroupId {}] Received updated regular expressions: {}.", + groupId, resolvedRegularExpressions); + } + + List records = new ArrayList<>(); + try { + ConsumerGroup group = consumerGroup(groupId); + Map subscribedTopicNames = new HashMap<>(group.subscribedTopicNames()); + + boolean bumpGroupEpoch = false; + for (Map.Entry entry : resolvedRegularExpressions.entrySet()) { + String regex = entry.getKey(); + + // We can skip the regex if the group is no longer + // subscribed to it. + if (group.numSubscribedMembers(regex) == 0) continue; + + ResolvedRegularExpression newResolvedRegularExpression = entry.getValue(); + ResolvedRegularExpression oldResolvedRegularExpression = group + .resolvedRegularExpression(regex) + .orElse(ResolvedRegularExpression.EMPTY); + + if (!oldResolvedRegularExpression.topics.equals(newResolvedRegularExpression.topics)) { + bumpGroupEpoch = true; + + oldResolvedRegularExpression.topics.forEach(topicName -> + subscribedTopicNames.compute(topicName, Utils::decValue) + ); + + newResolvedRegularExpression.topics.forEach(topicName -> + subscribedTopicNames.compute(topicName, Utils::incValue) + ); + } + + // Add the record to persist the change. + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + regex, + newResolvedRegularExpression + )); + } + + // Compute the subscription metadata. + Map subscriptionMetadata = group.computeSubscriptionMetadata( + subscribedTopicNames, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + if (log.isDebugEnabled()) { + log.debug("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + } + bumpGroupEpoch = true; + records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + int groupEpoch = group.groupEpoch() + 1; + records.add(newConsumerGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + group.setMetadataRefreshDeadline( + time.milliseconds() + consumerGroupMetadataRefreshIntervalMs, + groupEpoch + ); + } + } catch (GroupIdNotFoundException ex) { + log.debug("[GroupId {}] Received result of regular expression resolution but " + + "it no longer exists.", groupId); + } + + return new CoordinatorResult<>(records); + } + /** * Creates the member subscription record if the updatedMember is different from * the old member. Returns true if the subscribedTopicNames has changed. @@ -3835,24 +4134,39 @@ public class GroupMetadataManager { public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { metadataImage = newImage; + // Initialize the last offset if it was not yet. + if (lastMetadataImageWithNewTopics == -1L) { + lastMetadataImageWithNewTopics = metadataImage.provenance().lastContainedOffset(); + } + + TopicsDelta topicsDelta = delta.topicsDelta(); + if (topicsDelta == null) return; + + // Updated the last offset of the image with newly created topics. This is used to + // trigger a refresh of all the regular expressions when topics are created. Note + // that we don't trigger a refresh when topics are deleted. Those are removed from + // the subscription metadata (and the assignment) via the above mechanism. The + // resolved regular expressions are cleaned up on the next refresh. + if (!topicsDelta.createdTopicIds().isEmpty()) { + lastMetadataImageWithNewTopics = metadataImage.provenance().lastContainedOffset(); + } + // Notify all the groups subscribed to the created, updated or // deleted topics. - Optional.ofNullable(delta.topicsDelta()).ifPresent(topicsDelta -> { - Set allGroupIds = new HashSet<>(); - topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { - String topicName = topicDelta.name(); - allGroupIds.addAll(groupsSubscribedToTopic(topicName)); - }); - topicsDelta.deletedTopicIds().forEach(topicId -> { - TopicImage topicImage = delta.image().topics().getTopic(topicId); - allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name())); - }); - allGroupIds.forEach(groupId -> { - Group group = groups.get(groupId); - if (group != null && (group.type() == CONSUMER || group.type() == SHARE)) { - ((ModernGroup) group).requestMetadataRefresh(); - } - }); + Set allGroupIds = new HashSet<>(); + topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { + String topicName = topicDelta.name(); + allGroupIds.addAll(groupsSubscribedToTopic(topicName)); + }); + topicsDelta.deletedTopicIds().forEach(topicId -> { + TopicImage topicImage = delta.image().topics().getTopic(topicId); + allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name())); + }); + allGroupIds.forEach(groupId -> { + Group group = groups.get(groupId); + if (group != null && (group.type() == CONSUMER || group.type() == SHARE)) { + ((ModernGroup) group).requestMetadataRefresh(); + } }); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 544ca7fa8d2..ed93251e544 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -48,6 +48,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -379,6 +380,32 @@ public class ConsumerGroup extends ModernGroup { } } + /** + * @return The last time resolved regular expressions were refreshed or Long.MIN_VALUE if + * there are no resolved regular expression. Note that we use the timestamp of the first + * entry as a proxy for all of them. They are always resolved together. + */ + public long lastResolvedRegularExpressionRefreshTimeMs() { + Iterator iterator = resolvedRegularExpressions.values().iterator(); + if (iterator.hasNext()) { + return iterator.next().timestamp; + } else { + return Long.MIN_VALUE; + } + } + + /** + * @return The version of the regular expressions or Zero if there are no resolved regular expression. + */ + public long lastResolvedRegularExpressionVersion() { + Iterator iterator = resolvedRegularExpressions.values().iterator(); + if (iterator.hasNext()) { + return iterator.next().version; + } else { + return 0L; + } + } + /** * Return an optional containing the resolved regular expression corresponding to the provided regex * or an empty optional. @@ -386,10 +413,17 @@ public class ConsumerGroup extends ModernGroup { * @param regex The regular expression. * @return The optional containing the resolved regular expression or an empty optional. */ - public Optional regularExpression(String regex) { + public Optional resolvedRegularExpression(String regex) { return Optional.ofNullable(resolvedRegularExpressions.get(regex)); } + /** + * @return The number of resolved regular expressions. + */ + public int numResolvedRegularExpressions() { + return resolvedRegularExpressions.size(); + } + /** * @return The number of members subscribed to the provided regex. */ @@ -397,6 +431,14 @@ public class ConsumerGroup extends ModernGroup { return subscribedRegularExpressions.getOrDefault(regex, 0); } + /** + * @return An immutable map containing all the subscribed regular expressions + * with the subscribers counts. + */ + public Map subscribedRegularExpressions() { + return Collections.unmodifiableMap(subscribedRegularExpressions); + } + /** * @return The number of members that use the classic protocol. */ @@ -747,11 +789,11 @@ public class ConsumerGroup extends ModernGroup { ConsumerGroupMember newMember ) { // Decrement the count of the old regex. - if (oldMember != null && oldMember.subscribedTopicRegex() != null) { + if (oldMember != null && oldMember.subscribedTopicRegex() != null && !oldMember.subscribedTopicRegex().isEmpty()) { subscribedRegularExpressions.compute(oldMember.subscribedTopicRegex(), Utils::decValue); } // Increment the count of the new regex. - if (newMember != null && newMember.subscribedTopicRegex() != null) { + if (newMember != null && newMember.subscribedTopicRegex() != null && !newMember.subscribedTopicRegex().isEmpty()) { subscribedRegularExpressions.compute(newMember.subscribedTopicRegex(), Utils::incValue); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ResolvedRegularExpression.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ResolvedRegularExpression.java index d13fb23da2f..7cef5602dd6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ResolvedRegularExpression.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ResolvedRegularExpression.java @@ -24,6 +24,8 @@ import java.util.Set; * The metadata associated with a regular expression in a Consumer Group. */ public class ResolvedRegularExpression { + public static final ResolvedRegularExpression EMPTY = new ResolvedRegularExpression(Collections.emptySet(), -1L, -1L); + /** * The set of resolved topics. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index e4f6606bf4f..c5bb26d4ba7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -66,6 +66,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor; import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredTimeout; import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; @@ -15063,7 +15064,7 @@ public class GroupMetadataManagerTest { assertEquals( Optional.of(resolvedRegularExpression), - context.groupMetadataManager.consumerGroup("foo").regularExpression("abc*") + context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") ); } @@ -15097,7 +15098,7 @@ public class GroupMetadataManagerTest { assertEquals( Optional.empty(), - context.groupMetadataManager.consumerGroup("foo").regularExpression("abc*") + context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") ); } @@ -15178,6 +15179,526 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testConsumerGroupMemberJoinsWithNewRegex() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(12345L)) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + // Member 2 joins the consumer group with a new regular expression. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Execute pending tasks. + List> tasks = context.processTasks(); + assertEquals( + List.of( + new MockCoordinatorExecutor.ExecutorResult<>( + groupId + "-regex", + new CoordinatorResult<>(List.of( + // The resolution of the new regex is persisted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo*", + new ResolvedRegularExpression( + Set.of("foo"), + 12345L, + context.time.milliseconds() + ) + ), + // The group epoch is bumped. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) + )) + ) + ), + tasks + ); + } + + @Test + public void testConsumerGroupMemberJoinsWithUpdatedRegex() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L)) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + // Member 1 updates its new regular expression. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000), + result.response() + ); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .build(); + + List expectedRecords = List.of( + // The member subscription is updated. + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + // The previous regular expression is deleted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Execute pending tasks. + List> tasks = context.processTasks(); + assertEquals(1, tasks.size()); + + MockCoordinatorExecutor.ExecutorResult task = tasks.get(0); + assertEquals(groupId + "-regex", task.key); + assertRecordsEquals( + List.of( + // The resolution of the new regex is persisted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo*|bar*", + new ResolvedRegularExpression( + Set.of("foo", "bar"), + 12345L, + context.time.milliseconds() + ) + ), + // The updated subscription metadata. + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( + groupId, + Map.of( + "foo", new TopicMetadata(fooTopicId, fooTopicName, 6), + "bar", new TopicMetadata(barTopicId, barTopicName, 3) + ) + ), + // The group epoch is bumped. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) + ), + task.result.records() + ); + } + + @Test + public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCompleted() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L)) + .build(); + + // Member 1 joins. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .build(); + + List expectedRecords = List.of( + // The member subscription is created. + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + // The group epoch is bumped. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), + // The target assignment is created. + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Collections.emptyMap()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), + // The member current state is created. + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // The task is scheduled. + assertTrue(context.executor.isScheduled(groupId + "-regex")); + + // The member updates its regex before the resolution of the previous one completes. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000), + result.response() + ); + + expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .build(); + + expectedRecords = List.of( + // The member subscription is updated. + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + // The previous regex is deleted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // The task is still scheduled. + assertTrue(context.executor.isScheduled(groupId + "-regex")); + assertEquals(1, context.executor.size()); + + // Execute the pending tasks. + List> tasks = context.processTasks(); + assertEquals(1, tasks.size()); + + // The pending task was a no-op. + MockCoordinatorExecutor.ExecutorResult task = tasks.get(0); + assertEquals(groupId + "-regex", task.key); + assertRecordsEquals(Collections.emptyList(), task.result.records()); + + // The member heartbeats again. It triggers a new resolution. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000), + result.response() + ); + + assertTrue(context.executor.isScheduled(groupId + "-regex")); + assertEquals(1, context.executor.size()); + + // Execute pending tasks. + tasks = context.processTasks(); + assertEquals(1, tasks.size()); + + task = tasks.get(0); + assertEquals(groupId + "-regex", task.key); + assertRecordsEquals( + List.of( + // The resolution of the new regex is persisted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo*|bar*", + new ResolvedRegularExpression( + Set.of("foo", "bar"), + 12345L, + context.time.milliseconds() + ) + ), + // The updated subscription metadata. + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( + groupId, + Map.of( + "foo", new TopicMetadata(fooTopicId, fooTopicName, 6), + "bar", new TopicMetadata(barTopicId, barTopicName, 3) + ) + ), + // The group epoch is bumped. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2) + ), + task.result.records() + ); + } + + @Test + public void testConsumerGroupMemberJoinRefreshesExpiredRegexes() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid foooTopicId = Uuid.randomUuid(); + String foooTopicName = "fooo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(1L); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withMetadataImage(image) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("bar*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(barTopicId, 0, 1, 2))) + .build()) + .withResolvedRegularExpression("foo*", new ResolvedRegularExpression( + Set.of(fooTopicName), 0L, 0L)) + .withResolvedRegularExpression("bar*", new ResolvedRegularExpression( + Set.of(barTopicName), 0L, 0L)) + .withSubscriptionMetadata(Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 3))) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + // Update metadata image. + MetadataImage newImage = new MetadataImageBuilder(image) + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(foooTopicId, foooTopicName, 1) + .build(2L); + + context.groupMetadataManager.onNewMetadataImage( + newImage, + new MetadataDelta(newImage) + ); + + // A member heartbeats. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10)); + + // The task is NOT scheduled. + assertFalse(context.executor.isScheduled(groupId + "-regex")); + + // Advance past the batching interval. + context.sleep(11000L); + + // A member heartbeats. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10)); + + // The task is scheduled. + assertTrue(context.executor.isScheduled(groupId + "-regex")); + + // Execute the pending tasks. + List> tasks = context.processTasks(); + assertEquals(1, tasks.size()); + + // Execute pending tasks. + MockCoordinatorExecutor.ExecutorResult task = tasks.get(0); + assertEquals(groupId + "-regex", task.key); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo*", + new ResolvedRegularExpression( + Set.of(fooTopicName, foooTopicName), + 2L, + context.time.milliseconds() + ) + ), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "bar*", + new ResolvedRegularExpression( + Set.of(barTopicName), + 2L, + context.time.milliseconds() + ) + ), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( + groupId, + Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 3), + foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1) + ) + ), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) + ); + + assertUnorderedListEquals(expectedRecords.subList(0, 2), task.result.records().subList(0, 2)); + assertRecordsEquals(expectedRecords.subList(2, 4), task.result.records().subList(2, 4)); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 0fbecb8cbe8..4230934a499 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor; import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; @@ -398,8 +399,9 @@ public class GroupMetadataManagerTestContext { } public static class Builder { - private final MockTime time = new MockTime(); + private final MockTime time = new MockTime(0, 0, 0); private final MockCoordinatorTimer timer = new MockCoordinatorTimer<>(time); + private final MockCoordinatorExecutor executor = new MockCoordinatorExecutor<>(); private final LogContext logContext = new LogContext(); private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private MetadataImage metadataImage; @@ -493,6 +495,7 @@ public class GroupMetadataManagerTestContext { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext( time, timer, + executor, snapshotRegistry, metrics, new GroupMetadataManager.Builder() @@ -500,6 +503,7 @@ public class GroupMetadataManagerTestContext { .withLogContext(logContext) .withTime(time) .withTimer(timer) + .withExecutor(executor) .withMetadataImage(metadataImage) .withConsumerGroupHeartbeatInterval(5000) .withConsumerGroupSessionTimeout(45000) @@ -533,6 +537,7 @@ public class GroupMetadataManagerTestContext { final MockTime time; final MockCoordinatorTimer timer; + final MockCoordinatorExecutor executor; final SnapshotRegistry snapshotRegistry; final GroupCoordinatorMetricsShard metrics; final GroupMetadataManager groupMetadataManager; @@ -546,6 +551,7 @@ public class GroupMetadataManagerTestContext { public GroupMetadataManagerTestContext( MockTime time, MockCoordinatorTimer timer, + MockCoordinatorExecutor executor, SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, GroupMetadataManager groupMetadataManager, @@ -555,6 +561,7 @@ public class GroupMetadataManagerTestContext { ) { this.time = time; this.timer = timer; + this.executor = executor; this.snapshotRegistry = snapshotRegistry; this.metrics = metrics; this.groupMetadataManager = groupMetadataManager; @@ -677,6 +684,16 @@ public class GroupMetadataManagerTestContext { return timeouts; } + public List> processTasks() { + List> results = executor.poll(); + results.forEach(taskResult -> { + if (taskResult.result.replayRecords()) { + taskResult.result.records().forEach(this::replay); + } + }); + return results; + } + public void assertSessionTimeout( String groupId, String memberId, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java index 2f6aaccceba..23a01a60241 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java @@ -27,7 +27,15 @@ import org.apache.kafka.image.MetadataProvenance; import java.util.Arrays; public class MetadataImageBuilder { - private final MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + private final MetadataDelta delta; + + public MetadataImageBuilder() { + this(MetadataImage.EMPTY); + } + + public MetadataImageBuilder(MetadataImage image) { + this.delta = new MetadataDelta(image); + } public MetadataImageBuilder addTopic( Uuid topicId, @@ -61,6 +69,10 @@ public class MetadataImageBuilder { } public MetadataImage build() { - return delta.apply(MetadataProvenance.EMPTY); + return build(0); + } + + public MetadataImage build(long version) { + return delta.apply(new MetadataProvenance(version, 0, 0L, true)); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 4aa3cd7f700..a7e314f3f07 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor; import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.classic.ClassicGroup; @@ -100,6 +101,7 @@ public class OffsetMetadataManagerTest { public static class Builder { private final MockTime time = new MockTime(); private final MockCoordinatorTimer timer = new MockCoordinatorTimer<>(time); + private final MockCoordinatorExecutor executor = new MockCoordinatorExecutor<>(); private final LogContext logContext = new LogContext(); private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); @@ -133,6 +135,7 @@ public class OffsetMetadataManagerTest { groupMetadataManager = new GroupMetadataManager.Builder() .withTime(time) .withTimer(timer) + .withExecutor(executor) .withSnapshotRegistry(snapshotRegistry) .withLogContext(logContext) .withMetadataImage(metadataImage) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 39140289ea4..25ebcd613c8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1615,26 +1615,39 @@ public class ConsumerGroupTest { ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3") .setSubscribedTopicRegex("regex1") .build(); + ConsumerGroupMember member4 = new ConsumerGroupMember.Builder("member4") + .build(); // Assert the initial state. + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(0, consumerGroup.numSubscribedMembers("regex1")); assertEquals(0, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); // Add member 1. consumerGroup.updateMember(member1); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(1, consumerGroup.numSubscribedMembers("regex1")); assertEquals(0, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); // Add member 2. consumerGroup.updateMember(member2); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(1, consumerGroup.numSubscribedMembers("regex1")); assertEquals(1, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); // Add member 3. consumerGroup.updateMember(member3); + assertEquals(0, consumerGroup.numSubscribedMembers("")); + assertEquals(2, consumerGroup.numSubscribedMembers("regex1")); + assertEquals(1, consumerGroup.numSubscribedMembers("regex2")); + assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); + + // Add member 4. + consumerGroup.updateMember(member4); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(2, consumerGroup.numSubscribedMembers("regex1")); assertEquals(1, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); @@ -1644,24 +1657,28 @@ public class ConsumerGroupTest { .setSubscribedTopicRegex("regex2") .build(); consumerGroup.updateMember(member3); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(1, consumerGroup.numSubscribedMembers("regex1")); assertEquals(2, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); // Remove member 1. consumerGroup.removeMember(member1.memberId()); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(0, consumerGroup.numSubscribedMembers("regex1")); assertEquals(2, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); // Remove member 2. consumerGroup.removeMember(member2.memberId()); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(0, consumerGroup.numSubscribedMembers("regex1")); assertEquals(1, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); // Remove member 3. consumerGroup.removeMember(member3.memberId()); + assertEquals(0, consumerGroup.numSubscribedMembers("")); assertEquals(0, consumerGroup.numSubscribedMembers("regex1")); assertEquals(0, consumerGroup.numSubscribedMembers("regex2")); assertEquals(0, consumerGroup.numSubscribedMembers("regex3")); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java new file mode 100644 index 00000000000..08db52e4e60 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.GroupMetadataManager; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class RegexResolutionBenchmark { + private static final Logger LOG = new LogContext().logger(RegexResolutionBenchmark.class); + private static final Time TIME = Time.SYSTEM; + private static final String GROUP_ID = "my-group-id"; + + private static final List WORDS = List.of( + "data", + "stream", + "queue", + "analytics", + "service", + "event", + "log", + "cloud", + "process", + "system", + "message", + "broker", + "partition", + "key", + "value", + "cluster", + "zookeeper", + "replication", + "topic", + "producer" + ); + + @Param({"10000", "100000", "1000000"}) + private int topicCount; + + @Param({"1", "10", "100"}) + private int regexCount; + + private MetadataImage image; + + private Set regexes; + + @Setup(Level.Trial) + public void setup() { + Random random = new Random(); + + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + for (int i = 0; i < topicCount; i++) { + String topicName = + WORDS.get(random.nextInt(WORDS.size())) + "_" + + WORDS.get(random.nextInt(WORDS.size())) + "_" + + i; + + delta.replay(new TopicRecord() + .setTopicId(Uuid.randomUuid()) + .setName(topicName)); + } + image = delta.apply(MetadataProvenance.EMPTY); + + regexes = new HashSet<>(); + for (int i = 0; i < regexCount; i++) { + regexes.add(".*" + WORDS.get(random.nextInt(WORDS.size())) + ".*"); + } + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void run() { + GroupMetadataManager.refreshRegularExpressions( + GROUP_ID, + LOG, + TIME, + image, + regexes + ); + } +}