diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index cea68e09ffb..be9c4f9abc3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2249,18 +2249,13 @@ public class GroupMetadataManager { .setClassicMemberMetadata(null) .build(); - // If the group is newly created, we must ensure that it moves away from - // epoch 0 and that it is fully initialized. - boolean bumpGroupEpoch = group.groupEpoch() == 0; - - bumpGroupEpoch |= hasMemberSubscriptionChanged( + boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged( groupId, member, updatedMember, records ); - - bumpGroupEpoch |= maybeUpdateRegularExpressions( + UpdateRegularExpressionsResult updateRegularExpressionsResult = maybeUpdateRegularExpressions( context, group, member, @@ -2268,9 +2263,24 @@ public class GroupMetadataManager { records ); + // The subscription has changed when either the subscribed topic names or subscribed topic + // regex has changed. + boolean hasSubscriptionChanged = subscribedTopicNamesChanged || updateRegularExpressionsResult.regexUpdated(); int groupEpoch = group.groupEpoch(); SubscriptionType subscriptionType = group.subscriptionType(); + boolean bumpGroupEpoch = + // If the group is newly created, we must ensure that it moves away from + // epoch 0 and that it is fully initialized. + groupEpoch == 0 || + // Bumping the group epoch signals that the target assignment should be updated. We bump + // the group epoch when the member has changed its subscribed topic names or the member + // has changed its subscribed topic regex to a regex that is already resolved. We avoid + // bumping the group epoch when the new subscribed topic regex has not been resolved + // yet, since we will have to update the target assignment again later. + subscribedTopicNamesChanged || + updateRegularExpressionsResult == UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { // The subscription metadata is updated in two cases: // 1) The member has updated its subscriptions; @@ -2315,6 +2325,9 @@ public class GroupMetadataManager { group::currentPartitionEpoch, targetAssignmentEpoch, targetAssignment, + group.resolvedRegularExpressions(), + // Force consistency with the subscription when the subscription has changed. + hasSubscriptionChanged, ownedTopicPartitions, records ); @@ -2468,6 +2481,8 @@ public class GroupMetadataManager { group::currentPartitionEpoch, group.assignmentEpoch(), group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()), + group.resolvedRegularExpressions(), + bumpGroupEpoch, toTopicPartitions(subscription.ownedPartitions(), metadataImage), records ); @@ -2511,6 +2526,9 @@ public class GroupMetadataManager { group::currentPartitionEpoch, targetAssignmentEpoch, targetAssignment, + group.resolvedRegularExpressions(), + // Force consistency with the subscription when the subscription has changed. + bumpGroupEpoch, toTopicPartitions(subscription.ownedPartitions(), metadataImage), records ); @@ -2669,6 +2687,8 @@ public class GroupMetadataManager { updatedMember, targetAssignmentEpoch, targetAssignment, + // Force consistency with the subscription when the subscription has changed. + bumpGroupEpoch, records ); @@ -3108,6 +3128,16 @@ public class GroupMetadataManager { return value != null && !value.isEmpty(); } + private enum UpdateRegularExpressionsResult { + NO_CHANGE, + REGEX_UPDATED, + REGEX_UPDATED_AND_RESOLVED; + + public boolean regexUpdated() { + return this == REGEX_UPDATED || this == REGEX_UPDATED_AND_RESOLVED; + } + } + /** * Check whether the member has updated its subscribed topic regular expression and * may trigger the resolution/the refresh of all the regular expressions in the @@ -3119,9 +3149,9 @@ public class GroupMetadataManager { * @param member The old member. * @param updatedMember The new member. * @param records The records accumulator. - * @return Whether a rebalance must be triggered. + * @return The result of the update. */ - private boolean maybeUpdateRegularExpressions( + private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( AuthorizableRequestContext context, ConsumerGroup group, ConsumerGroupMember member, @@ -3134,14 +3164,17 @@ public class GroupMetadataManager { String oldSubscribedTopicRegex = member.subscribedTopicRegex(); String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex(); - boolean bumpGroupEpoch = false; boolean requireRefresh = false; + UpdateRegularExpressionsResult updateRegularExpressionsResult = UpdateRegularExpressionsResult.NO_CHANGE; // Check whether the member has changed its subscribed regex. - if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex)) { + boolean subscribedTopicRegexChanged = !Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex); + if (subscribedTopicRegexChanged) { log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.", groupId, memberId, newSubscribedTopicRegex); + updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED; + if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) { // If the member was the last one subscribed to the regex, we delete the // resolved regular expression. @@ -3160,7 +3193,9 @@ public class GroupMetadataManager { } else { // If the new regex is already resolved, we trigger a rebalance // by bumping the group epoch. - bumpGroupEpoch = group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent(); + if (group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) { + updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; + } } } } @@ -3176,20 +3211,20 @@ public class GroupMetadataManager { // 0. The group is subscribed to regular expressions. We also take the one // that the current may have just introduced. if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) { - return bumpGroupEpoch; + return updateRegularExpressionsResult; } // 1. There is no ongoing refresh for the group. String key = group.groupId() + "-regex"; if (executor.isScheduled(key)) { - return bumpGroupEpoch; + return updateRegularExpressionsResult; } // 2. The last refresh is older than 10s. If the group does not have any regular // expressions but the current member just brought a new one, we should continue. long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs(); if (currentTimeMs <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) { - return bumpGroupEpoch; + return updateRegularExpressionsResult; } // 3.1 The group has unresolved regular expressions. @@ -3218,7 +3253,7 @@ public class GroupMetadataManager { ); } - return bumpGroupEpoch; + return updateRegularExpressionsResult; } /** @@ -3492,16 +3527,18 @@ public class GroupMetadataManager { /** * Reconciles the current assignment of the member towards the target assignment if needed. * - * @param groupId The group id. - * @param member The member to reconcile. - * @param currentPartitionEpoch The function returning the current epoch of - * a given partition. - * @param targetAssignmentEpoch The target assignment epoch. - * @param targetAssignment The target assignment. - * @param ownedTopicPartitions The list of partitions owned by the member. This - * is reported in the ConsumerGroupHeartbeat API and - * it could be null if not provided. - * @param records The list to accumulate any new records. + * @param groupId The group id. + * @param member The member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param resolvedRegularExpressions The resolved regular expressions. + * @param hasSubscriptionChanged Whether the member has changed its subscription on the current heartbeat. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. * @return The received member if no changes have been made; or a new * member containing the new assignment. */ @@ -3511,15 +3548,20 @@ public class GroupMetadataManager { BiFunction currentPartitionEpoch, int targetAssignmentEpoch, Assignment targetAssignment, + Map resolvedRegularExpressions, + boolean hasSubscriptionChanged, List ownedTopicPartitions, List records ) { - if (member.isReconciledTo(targetAssignmentEpoch)) { + if (!hasSubscriptionChanged && member.isReconciledTo(targetAssignmentEpoch)) { return member; } ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withHasSubscriptionChanged(hasSubscriptionChanged) + .withResolvedRegularExpressions(resolvedRegularExpressions) .withCurrentPartitionEpoch(currentPartitionEpoch) .withOwnedTopicPartitions(ownedTopicPartitions) .build(); @@ -3556,11 +3598,12 @@ public class GroupMetadataManager { /** * Reconciles the current assignment of the member towards the target assignment if needed. * - * @param groupId The group id. - * @param member The member to reconcile. - * @param targetAssignmentEpoch The target assignment epoch. - * @param targetAssignment The target assignment. - * @param records The list to accumulate any new records. + * @param groupId The group id. + * @param member The member to reconcile. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param hasSubscriptionChanged Whether the member has changed its subscription on the current heartbeat. + * @param records The list to accumulate any new records. * @return The received member if no changes have been made; or a new * member containing the new assignment. */ @@ -3569,14 +3612,17 @@ public class GroupMetadataManager { ShareGroupMember member, int targetAssignmentEpoch, Assignment targetAssignment, + boolean hasSubscriptionChanged, List records ) { - if (member.isReconciledTo(targetAssignmentEpoch)) { + if (!hasSubscriptionChanged && member.isReconciledTo(targetAssignmentEpoch)) { return member; } ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withHasSubscriptionChanged(hasSubscriptionChanged) .build(); if (!updatedMember.equals(member)) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java index 74a5bd7a2e3..63d61a3b923 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java @@ -19,8 +19,11 @@ package org.apache.kafka.coordinator.group.modern.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.FencedMemberEpochException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.TopicIds; +import org.apache.kafka.coordinator.group.modern.UnionSet; import java.util.HashMap; import java.util.HashSet; @@ -41,6 +44,11 @@ public class CurrentAssignmentBuilder { */ private final ConsumerGroupMember member; + /** + * The metadata image. + */ + private CoordinatorMetadataImage metadataImage = CoordinatorMetadataImage.EMPTY; + /** * The target assignment epoch. */ @@ -51,6 +59,16 @@ public class CurrentAssignmentBuilder { */ private Assignment targetAssignment; + /** + * Whether the member has changed its subscription on the current heartbeat. + */ + private boolean hasSubscriptionChanged; + + /** + * The resolved regular expressions. + */ + private Map resolvedRegularExpressions = Map.of(); + /** * A function which returns the current epoch of a topic-partition or -1 if the * topic-partition is not assigned. The current epoch is the epoch of the current owner. @@ -73,6 +91,19 @@ public class CurrentAssignmentBuilder { this.member = Objects.requireNonNull(member); } + /** + * Sets the metadata image. + * + * @param metadataImage The metadata image. + * @return This object. + */ + public CurrentAssignmentBuilder withMetadataImage( + CoordinatorMetadataImage metadataImage + ) { + this.metadataImage = metadataImage; + return this; + } + /** * Sets the target assignment epoch and the target assignment that the * consumer group member must be reconciled to. @@ -90,6 +121,32 @@ public class CurrentAssignmentBuilder { return this; } + /** + * Sets whether the member has changed its subscription on the current heartbeat. + * + * @param hasSubscriptionChanged If true, always removes unsubscribed topics from the current assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withHasSubscriptionChanged( + boolean hasSubscriptionChanged + ) { + this.hasSubscriptionChanged = hasSubscriptionChanged; + return this; + } + + /** + * Sets the resolved regular expressions. + * + * @param resolvedRegularExpressions The resolved regular expressions. + * @return This object. + */ + public CurrentAssignmentBuilder withResolvedRegularExpressions( + Map resolvedRegularExpressions + ) { + this.resolvedRegularExpressions = resolvedRegularExpressions; + return this; + } + /** * Sets a BiFunction which allows to retrieve the current epoch of a * partition. This is used by the state machine to determine if a @@ -132,12 +189,15 @@ public class CurrentAssignmentBuilder { case STABLE: // When the member is in the STABLE state, we verify if a newer // epoch (or target assignment) is available. If it is, we can - // reconcile the member towards it. Otherwise, we return. + // reconcile the member towards it. Otherwise, we ensure the + // assignment is consistent with the subscribed topics, if changed. if (member.memberEpoch() != targetAssignmentEpoch) { return computeNextAssignment( member.memberEpoch(), member.assignedPartitions() ); + } else if (hasSubscriptionChanged) { + return updateCurrentAssignment(member.assignedPartitions()); } else { return member; } @@ -147,18 +207,27 @@ public class CurrentAssignmentBuilder { // until the member has revoked the necessary partitions. They are // considered revoked when they are not anymore reported in the // owned partitions set in the ConsumerGroupHeartbeat API. + // Additional partitions may need revoking when the member's + // subscription changes. // If the member provides its owned partitions. We verify if it still // owns any of the revoked partitions. If it does, we cannot progress. if (ownsRevokedPartitions(member.partitionsPendingRevocation())) { - return member; + if (hasSubscriptionChanged) { + return updateCurrentAssignment(member.assignedPartitions()); + } else { + return member; + } } // When the member has revoked all the pending partitions, it can // transition to the next epoch (current + 1) and we can reconcile // its state towards the latest target assignment. return computeNextAssignment( - member.memberEpoch() + 1, + // When we enter UNREVOKED_PARTITIONS due to a subscription change, + // we must not advance the member epoch when the new target + // assignment is not available yet. + Math.min(member.memberEpoch() + 1, targetAssignmentEpoch), member.assignedPartitions() ); @@ -215,6 +284,71 @@ public class CurrentAssignmentBuilder { return false; } + /** + * Updates the current assignment, removing any partitions that are not part of the subscribed topics. + * This method is a lot faster than running the full reconciliation logic in computeNextAssignment. + * + * @param memberAssignedPartitions The assigned partitions of the member to use. + * @return A new ConsumerGroupMember. + */ + private ConsumerGroupMember updateCurrentAssignment( + Map> memberAssignedPartitions + ) { + Set subscribedTopicIds = subscribedTopicIds(); + + // Reuse the original map if no topics need to be removed. + Map> newAssignedPartitions; + Map> newPartitionsPendingRevocation; + if (subscribedTopicIds.isEmpty() && member.partitionsPendingRevocation().isEmpty()) { + newAssignedPartitions = Map.of(); + newPartitionsPendingRevocation = memberAssignedPartitions; + } else { + newAssignedPartitions = memberAssignedPartitions; + newPartitionsPendingRevocation = new HashMap<>(member.partitionsPendingRevocation()); + for (Map.Entry> entry : memberAssignedPartitions.entrySet()) { + if (!subscribedTopicIds.contains(entry.getKey())) { + if (newAssignedPartitions == memberAssignedPartitions) { + newAssignedPartitions = new HashMap<>(memberAssignedPartitions); + newPartitionsPendingRevocation = new HashMap<>(member.partitionsPendingRevocation()); + } + newAssignedPartitions.remove(entry.getKey()); + newPartitionsPendingRevocation.merge( + entry.getKey(), + entry.getValue(), + (existing, additional) -> { + existing = new HashSet<>(existing); + existing.addAll(additional); + return existing; + } + ); + } + } + } + + if (newAssignedPartitions == memberAssignedPartitions) { + // If no partitions were removed, we can return the member as is. + return member; + } + + if (!newPartitionsPendingRevocation.isEmpty() && ownsRevokedPartitions(newPartitionsPendingRevocation)) { + return new ConsumerGroupMember.Builder(member) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setAssignedPartitions(newAssignedPartitions) + .setPartitionsPendingRevocation(newPartitionsPendingRevocation) + .build(); + } else { + // There were partitions removed, but they were already revoked. + // Keep the member in the current state and shrink the assigned partitions. + + // We do not expect to be in the UNREVOKED_PARTITIONS state here. The full + // reconciliation logic should handle the case where the member has revoked all its + // partitions pending revocation. + return new ConsumerGroupMember.Builder(member) + .setAssignedPartitions(newAssignedPartitions) + .build(); + } + } + /** * Computes the next assignment. * @@ -227,6 +361,8 @@ public class CurrentAssignmentBuilder { int memberEpoch, Map> memberAssignedPartitions ) { + Set subscribedTopicIds = subscribedTopicIds(); + boolean hasUnreleasedPartitions = false; Map> newAssignedPartitions = new HashMap<>(); Map> newPartitionsPendingRevocation = new HashMap<>(); @@ -241,6 +377,11 @@ public class CurrentAssignmentBuilder { Set currentAssignedPartitions = memberAssignedPartitions .getOrDefault(topicId, Set.of()); + // If the member is no longer subscribed to the topic, treat its target assignment as empty. + if (!subscribedTopicIds.contains(topicId)) { + target = Set.of(); + } + // New Assigned Partitions = Previous Assigned Partitions ∩ Target Set assignedPartitions = new HashSet<>(currentAssignedPartitions); assignedPartitions.retainAll(target); @@ -317,4 +458,28 @@ public class CurrentAssignmentBuilder { .build(); } } + + /** + * Gets the set of topic IDs that the member is subscribed to. + * + * @return The set of topic IDs that the member is subscribed to. + */ + private Set subscribedTopicIds() { + Set subscriptions = member.subscribedTopicNames(); + String subscribedTopicRegex = member.subscribedTopicRegex(); + if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) { + ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex); + if (resolvedRegularExpression != null) { + if (subscriptions.isEmpty()) { + subscriptions = resolvedRegularExpression.topics(); + } else if (!resolvedRegularExpression.topics().isEmpty()) { + subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics()); + } + } else { + // Treat an unresolved regex as matching no topics, to be conservative. + } + } + + return new TopicIds(subscriptions, metadataImage); + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java index 38bcfae47e1..98b40340b0a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java @@ -16,10 +16,16 @@ */ package org.apache.kafka.coordinator.group.modern.share; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.TopicIds; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import java.util.Set; /** * The ShareGroupAssignmentBuilder class encapsulates the reconciliation engine of the @@ -32,6 +38,11 @@ public class ShareGroupAssignmentBuilder { */ private final ShareGroupMember member; + /** + * The metadata image. + */ + private CoordinatorMetadataImage metadataImage = CoordinatorMetadataImage.EMPTY; + /** * The target assignment epoch. */ @@ -42,6 +53,11 @@ public class ShareGroupAssignmentBuilder { */ private Assignment targetAssignment; + /** + * Whether the member has changed its subscription on the current heartbeat. + */ + private boolean hasSubscriptionChanged; + /** * Constructs the ShareGroupAssignmentBuilder based on the current state of the * provided share group member. @@ -52,6 +68,19 @@ public class ShareGroupAssignmentBuilder { this.member = Objects.requireNonNull(member); } + /** + * Sets the metadata image. + * + * @param metadataImage The metadata image. + * @return This object. + */ + public ShareGroupAssignmentBuilder withMetadataImage( + CoordinatorMetadataImage metadataImage + ) { + this.metadataImage = metadataImage; + return this; + } + /** * Sets the target assignment epoch and the target assignment that the * share group member must be reconciled to. @@ -69,6 +98,19 @@ public class ShareGroupAssignmentBuilder { return this; } + /** + * Sets whether the member has changed its subscription on the current heartbeat. + * + * @param hasSubscriptionChanged If true, always removes unsubscribed topics from the current assignment. + * @return This object. + */ + public ShareGroupAssignmentBuilder withHasSubscriptionChanged( + boolean hasSubscriptionChanged + ) { + this.hasSubscriptionChanged = hasSubscriptionChanged; + return this; + } + /** * Builds the next state for the member or keep the current one if it * is not possible to move forward with the current state. @@ -83,11 +125,38 @@ public class ShareGroupAssignmentBuilder { // when the member is updated. return new ShareGroupMember.Builder(member) .setState(MemberState.STABLE) - .setAssignedPartitions(targetAssignment.partitions()) + // If we have client-side assignors, the latest target assignment may not + // be consistent with the latest subscribed topics, so we must always + // filter the assigned partitions to ensure they are consistent with the + // subscribed topics. + .setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames())) .updateMemberEpoch(targetAssignmentEpoch) .build(); + } else if (hasSubscriptionChanged) { + return new ShareGroupMember.Builder(member) + .setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames())) + .build(); + } else { + return member; } + } - return member; + private Map> filterAssignedPartitions( + Map> partitions, + Set subscribedTopicNames + ) { + TopicIds subscribedTopicIds = new TopicIds(member.subscribedTopicNames(), metadataImage); + + // Reuse the original map if no topics need to be removed. + Map> filteredPartitions = partitions; + for (Map.Entry> entry : partitions.entrySet()) { + if (!subscribedTopicIds.contains(entry.getKey())) { + if (filteredPartitions == partitions) { + filteredPartitions = new HashMap<>(partitions); + } + filteredPartitions.remove(entry.getKey()); + } + } + return filteredPartitions; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java index 57af4c98fd4..2bb75578c7b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java @@ -74,6 +74,7 @@ public class ShareGroupMember extends ModernGroupMember { this.memberId = Objects.requireNonNull(newMemberId); this.memberEpoch = member.memberEpoch; this.previousMemberEpoch = member.previousMemberEpoch; + this.state = member.state; this.rackId = member.rackId; this.clientId = member.clientId; this.clientHost = member.clientHost; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 957ae7e8147..6521c48532c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -20604,7 +20604,7 @@ public class GroupMetadataManagerTest { .build(); // Member 1 updates its new regular expression. - CoordinatorResult result = context.consumerGroupHeartbeat( + CoordinatorResult result1 = context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) @@ -20620,19 +20620,15 @@ public class GroupMetadataManagerTest { .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List.of( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(List.of(0, 1, 2, 3, 4, 5)) - )) + .setTopicPartitions(List.of()) ), - result.response() + result1.response() ); ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) .setMemberEpoch(10) - .setPreviousMemberEpoch(0) + .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -20644,10 +20640,12 @@ public class GroupMetadataManagerTest { // The member subscription is updated. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), // The previous regular expression is deleted. - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + // The member assignment is updated. + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) ); - assertRecordsEquals(expectedRecords, result.records()); + assertRecordsEquals(expectedRecords, result1.records()); // Execute pending tasks. List> tasks = context.processTasks(); @@ -20675,6 +20673,65 @@ public class GroupMetadataManagerTest { ), task.result().records() ); + + assignor.prepareGroupAssignment(new GroupAssignment(Map.of( + memberId1, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2) + )) + ))); + + // Member heartbeats again with the same regex. + CoordinatorResult result2 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignor("range") + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)), + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(List.of(0, 1, 2))))), + result2.response() + ); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2))) + .build(); + + expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ); + + assertRecordsEquals(expectedRecords, result2.records()); } @Test @@ -21077,10 +21134,7 @@ public class GroupMetadataManagerTest { .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List.of( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(List.of(3, 4, 5))))), + .setTopicPartitions(List.of())), result1.response() ); @@ -21098,7 +21152,8 @@ public class GroupMetadataManagerTest { assertRecordsEquals( List.of( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ), result1.records() ); @@ -21164,8 +21219,7 @@ public class GroupMetadataManagerTest { .setRebalanceTimeoutMs(5000) .setSubscribedTopicRegex("foo|bar*") .setServerAssignorName("range") - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 3, 4, 5))) + .setAssignedPartitions(mkAssignment()) .build(); assertResponseEquals( @@ -21174,10 +21228,7 @@ public class GroupMetadataManagerTest { .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List.of( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(List.of(3, 4, 5))))), + .setTopicPartitions(List.of())), result2.response() ); @@ -21306,10 +21357,7 @@ public class GroupMetadataManagerTest { .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List.of( - new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(fooTopicId) - .setPartitions(List.of(3, 4, 5))))), + .setTopicPartitions(List.of())), result1.response() ); @@ -21327,7 +21375,8 @@ public class GroupMetadataManagerTest { assertRecordsEquals( List.of( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ), result1.records() ); @@ -21440,6 +21489,219 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .buildCoordinatorMetadataImage(12345L); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + // Static member temporarily leaves the group. + CoordinatorResult result1 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setInstanceId(instanceId) + .setMemberId(memberId1) + .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) + ); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH), + result1.response() + ); + + // Static member joins the group with an updated regular expression. + CoordinatorResult result2 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setInstanceId(instanceId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignor("range") + .setTopicPartitions(List.of())); + + // The returned assignment does not contain topics not in the current regular expression. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of()) + ), + result2.response() + ); + + ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setInstanceId(instanceId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2))) + .build(); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setInstanceId(instanceId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .build(); + + List expectedRecords = List.of( + // The previous member is deleted. + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1), + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1), + // The previous member is replaced by the new one. + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedCopiedMember), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedCopiedMember), + // The member subscription is updated. + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + // The previous regular expression is deleted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"), + // The member assignment is updated. + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) + ); + + assertRecordsEquals(expectedRecords, result2.records()); + + // Execute pending tasks. + List> tasks = context.processTasks(); + assertEquals(1, tasks.size()); + + MockCoordinatorExecutor.ExecutorResult task = tasks.get(0); + assertEquals(groupId + "-regex", task.key()); + assertRecordsEquals( + List.of( + // The resolution of the new regex is persisted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo*", + new ResolvedRegularExpression( + Set.of("foo"), + 12345L, + context.time.milliseconds() + ) + ), + // The group epoch is bumped. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))) + ), + task.result().records() + ); + + assignor.prepareGroupAssignment(new GroupAssignment(Map.of( + memberId2, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )) + ))); + + // Member heartbeats again with the same regex. + CoordinatorResult result3 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setInstanceId(instanceId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignor("range") + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5))))), + result3.response() + ); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setInstanceId(instanceId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ); + + assertRecordsEquals(expectedRecords, result3.records()); + } + @Test public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() { String groupId = "fooup"; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java index 3a4931efad9..48441780689 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java @@ -19,13 +19,19 @@ package org.apache.kafka.coordinator.group.modern.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.FencedMemberEpochException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; +import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -36,19 +42,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testStableToStable() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6)))) @@ -60,6 +75,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) @@ -70,19 +86,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testStableToStableWithNewPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3, 4), mkTopicAssignment(topicId2, 4, 5, 6, 7)))) @@ -94,6 +119,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3, 4), mkTopicAssignment(topicId2, 4, 5, 6, 7))) @@ -104,19 +130,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testStableToUnrevokedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 2, 3, 4), mkTopicAssignment(topicId2, 5, 6, 7)))) @@ -128,6 +163,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.UNREVOKED_PARTITIONS) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -141,19 +177,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testStableToUnreleasedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3, 4), mkTopicAssignment(topicId2, 4, 5, 6, 7)))) @@ -165,6 +210,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) @@ -175,19 +221,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testStableToUnreleasedPartitionsWithOwnedPartitionsNotHavingRevokedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 7)))) @@ -202,6 +257,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5))) @@ -212,13 +268,21 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnrevokedPartitionsToStable() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNREVOKED_PARTITIONS) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -228,6 +292,7 @@ public class CurrentAssignmentBuilderTest { .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6)))) @@ -246,6 +311,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -256,13 +322,21 @@ public class CurrentAssignmentBuilderTest { @Test public void testRemainsInUnrevokedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNREVOKED_PARTITIONS) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -272,6 +346,7 @@ public class CurrentAssignmentBuilderTest { .build(); CurrentAssignmentBuilder currentAssignmentBuilder = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(12, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6)))) @@ -311,15 +386,27 @@ public class CurrentAssignmentBuilderTest { ); } - @Test - public void testUnrevokedPartitionsToUnrevokedPartitions() { + @ParameterizedTest + @CsvSource({ + "10, 12, 11", + "10, 10, 10", // The member epoch must not advance past the target assignment epoch. + }) + public void testUnrevokedPartitionsToUnrevokedPartitions(int memberEpoch, int targetAssignmentEpoch, int expectedMemberEpoch) { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNREVOKED_PARTITIONS) - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) + .setMemberEpoch(memberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -329,7 +416,8 @@ public class CurrentAssignmentBuilderTest { .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) - .withTargetAssignment(12, new Assignment(mkAssignment( + .withMetadataImage(metadataImage) + .withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6)))) .withCurrentPartitionEpoch((topicId, partitionId) -> -1) @@ -345,8 +433,9 @@ public class CurrentAssignmentBuilderTest { assertEquals( new ConsumerGroupMember.Builder("member") .setState(MemberState.UNREVOKED_PARTITIONS) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) + .setMemberEpoch(expectedMemberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6))) @@ -360,19 +449,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnrevokedPartitionsToUnreleasedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNREVOKED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 2, 3, 4), mkTopicAssignment(topicId2, 5, 6, 7)))) @@ -391,6 +489,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -401,19 +500,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnreleasedPartitionsToStable() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(12, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6)))) @@ -425,6 +533,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(12) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) @@ -435,19 +544,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnreleasedPartitionsToStableWithNewPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 2, 3, 4), mkTopicAssignment(topicId2, 5, 6, 7)))) @@ -459,6 +577,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3, 4), mkTopicAssignment(topicId2, 5, 6, 7))) @@ -469,19 +588,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnreleasedPartitionsToUnreleasedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 2, 3, 4), mkTopicAssignment(topicId2, 5, 6, 7)))) @@ -493,19 +621,28 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnreleasedPartitionsToUnrevokedPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNRELEASED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 2, 3), mkTopicAssignment(topicId2, 5, 6))) .build(); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(12, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6)))) @@ -517,6 +654,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.UNREVOKED_PARTITIONS) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6))) @@ -530,13 +668,21 @@ public class CurrentAssignmentBuilderTest { @Test public void testUnknownState() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") .setState(MemberState.UNKNOWN) .setMemberEpoch(11) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6))) @@ -548,6 +694,7 @@ public class CurrentAssignmentBuilderTest { // When the member is in an unknown state, the member is first to force // a reset of the client side member state. assertThrows(FencedMemberEpochException.class, () -> new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(12, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6)))) @@ -556,6 +703,7 @@ public class CurrentAssignmentBuilderTest { // Then the member rejoins with no owned partitions. ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(12, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6)))) @@ -568,6 +716,7 @@ public class CurrentAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(12) .setPreviousMemberEpoch(11) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6))) @@ -575,4 +724,355 @@ public class CurrentAssignmentBuilderTest { updatedMember ); } + + @ParameterizedTest + @CsvSource({ + "10, 11, 11, false", // When advancing to a new target assignment, the assignment should + "10, 11, 11, true", // always take the subscription into account. + "10, 10, 10, true", + }) + public void testStableToStableWithAssignmentTopicsNoLongerInSubscription( + int memberEpoch, + int targetAssignmentEpoch, + int expectedMemberEpoch, + boolean hasSubscriptionChanged + ) { + String topic1 = "topic1"; + String topic2 = "topic2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(memberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6)))) + .withHasSubscriptionChanged(hasSubscriptionChanged) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId2) + .setPartitions(Arrays.asList(4, 5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(expectedMemberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @CsvSource({ + "10, 11, 10, false", // When advancing to a new target assignment, the assignment should always + "10, 11, 10, true", // take the subscription into account. + "10, 10, 10, true" + }) + public void testStableToUnrevokedPartitionsWithAssignmentTopicsNoLongerInSubscription( + int memberEpoch, + int targetAssignmentEpoch, + int expectedMemberEpoch, + boolean hasSubscriptionChanged + ) { + String topic1 = "topic1"; + String topic2 = "topic2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(memberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6)))) + .withHasSubscriptionChanged(hasSubscriptionChanged) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId1) + .setPartitions(Arrays.asList(1, 2, 3)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId2) + .setPartitions(Arrays.asList(4, 5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(expectedMemberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .build(), + updatedMember + ); + } + + @Test + public void testRemainsInUnrevokedPartitionsWithAssignmentTopicsNoLongerInSubscription() { + String topic1 = "topic1"; + String topic2 = "topic2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 2, 3), + mkTopicAssignment(topicId2, 5, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1), + mkTopicAssignment(topicId2, 4))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(12, new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 3, 4), + mkTopicAssignment(topicId2, 6, 7)))) + .withHasSubscriptionChanged(true) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId1) + .setPartitions(Arrays.asList(1, 2, 3)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId2) + .setPartitions(Arrays.asList(4, 5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId2, 5, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4))) + .build(), + updatedMember + ); + } + + @Test + public void testSubscribedTopicNameAndUnresolvedRegularExpression() { + String fooTopic = "foo"; + String barTopic = "bar"; + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopic, 10) + .addTopic(barTopicId, barTopic, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(fooTopic)) + .setSubscribedTopicRegex("bar*") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(10, new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6)))) + .withHasSubscriptionChanged(true) + .withResolvedRegularExpressions(Map.of()) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(1, 2, 3)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Arrays.asList(4, 5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(fooTopic)) + .setSubscribedTopicRegex("bar*") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(), + updatedMember + ); + } + + @Test + public void testUnresolvedRegularExpression() { + String fooTopic = "foo"; + String barTopic = "bar"; + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopic, 10) + .addTopic(barTopicId, barTopic, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of()) + .setSubscribedTopicRegex("bar*") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(10, new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6)))) + .withHasSubscriptionChanged(true) + .withResolvedRegularExpressions(Map.of()) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(1, 2, 3)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Arrays.asList(4, 5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of()) + .setSubscribedTopicRegex("bar*") + .setAssignedPartitions(mkAssignment()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(), + updatedMember + ); + } + + @Test + public void testSubscribedTopicNameAndResolvedRegularExpression() { + String fooTopic = "foo"; + String barTopic = "bar"; + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopic, 10) + .addTopic(barTopicId, barTopic, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(fooTopic)) + .setSubscribedTopicRegex("bar*") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(10, new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6)))) + .withHasSubscriptionChanged(true) + .withResolvedRegularExpressions(Map.of( + "bar*", new ResolvedRegularExpression( + Set.of("bar"), + 12345L, + 0L + ) + )) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(1, 2, 3)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Arrays.asList(4, 5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(fooTopic)) + .setSubscribedTopicRegex("bar*") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(), + updatedMember + ); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java index f3b6f4604e6..6edff7fc08e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java @@ -17,10 +17,16 @@ package org.apache.kafka.coordinator.group.modern.share; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; +import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.List; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -30,19 +36,28 @@ public class ShareGroupAssignmentBuilderTest { @Test public void testStableToStable() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ShareGroupMember member = new ShareGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6)))) @@ -53,6 +68,7 @@ public class ShareGroupAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) @@ -63,19 +79,28 @@ public class ShareGroupAssignmentBuilderTest { @Test public void testStableToStableWithNewPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + ShareGroupMember member = new ShareGroupMember.Builder("member") .setState(MemberState.STABLE) .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6))) .build(); ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member) + .withMetadataImage(metadataImage) .withTargetAssignment(11, new Assignment(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3, 4), mkTopicAssignment(topicId2, 4, 5, 6, 7)))) @@ -86,6 +111,7 @@ public class ShareGroupAssignmentBuilderTest { .setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) .setAssignedPartitions(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3, 4), mkTopicAssignment(topicId2, 4, 5, 6, 7))) @@ -93,4 +119,56 @@ public class ShareGroupAssignmentBuilderTest { updatedMember ); } + + @ParameterizedTest + @CsvSource({ + "10, 11, false", // When advancing to a new target assignment, the assignment should always + "10, 11, true", // take the subscription into account. + "10, 10, true" + }) + public void testStableToStableWithAssignmentTopicsNoLongerInSubscription( + int memberEpoch, + int targetAssignmentEpoch, + boolean hasSubscriptionChanged + ) { + String topic1 = "topic1"; + String topic2 = "topic2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + + ShareGroupMember member = new ShareGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(memberEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6)))) + .withHasSubscriptionChanged(hasSubscriptionChanged) + .build(); + + assertEquals( + new ShareGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(targetAssignmentEpoch) + .setPreviousMemberEpoch(memberEpoch) + .setSubscribedTopicNames(List.of(topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(), + updatedMember + ); + } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java new file mode 100644 index 00000000000..05ff6c311d7 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class CurrentAssignmentBuilderBenchmark { + + @Param({"5", "50"}) + private int partitionsPerTopic; + + @Param({"10", "100", "1000"}) + private int topicCount; + + private List topicNames; + + private List topicIds; + + private CoordinatorMetadataImage metadataImage; + + private ConsumerGroupMember member; + + private ConsumerGroupMember memberWithUnsubscribedTopics; + + private Assignment targetAssignment; + + @Setup(Level.Trial) + public void setup() { + setupTopics(); + setupMember(); + setupTargetAssignment(); + } + + private void setupTopics() { + topicNames = AssignorBenchmarkUtils.createTopicNames(topicCount); + topicIds = new ArrayList<>(topicCount); + metadataImage = AssignorBenchmarkUtils.createMetadataImage(topicNames, partitionsPerTopic); + + for (String topicName : topicNames) { + Uuid topicId = metadataImage.topicMetadata(topicName).get().id(); + topicIds.add(topicId); + } + } + + private void setupMember() { + Map> assignedPartitions = new HashMap<>(); + for (Uuid topicId : topicIds) { + Set partitions = IntStream.range(0, partitionsPerTopic) + .boxed() + .collect(Collectors.toSet()); + assignedPartitions.put(topicId, partitions); + } + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(topicNames) + .setAssignedPartitions(assignedPartitions); + + member = memberBuilder.build(); + memberWithUnsubscribedTopics = memberBuilder + .setSubscribedTopicNames(topicNames.subList(0, topicNames.size() - 1)) + .build(); + } + + private void setupTargetAssignment() { + Map> assignedPartitions = new HashMap<>(); + for (Uuid topicId : topicIds) { + Set partitions = IntStream.range(0, partitionsPerTopic) + .boxed() + .collect(Collectors.toSet()); + assignedPartitions.put(topicId, partitions); + } + targetAssignment = new Assignment(assignedPartitions); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public ConsumerGroupMember stableToStableWithNoChange() { + return new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(member.memberEpoch(), targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public ConsumerGroupMember stableToStableWithNewTargetAssignment() { + return new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(member.memberEpoch() + 1, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public ConsumerGroupMember stableToStableWithSubscriptionChange() { + return new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(member.memberEpoch(), targetAssignment) + .withHasSubscriptionChanged(true) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public ConsumerGroupMember stableToUnrevokedPartitionsWithSubscriptionChange() { + return new CurrentAssignmentBuilder(memberWithUnsubscribedTopics) + .withMetadataImage(metadataImage) + .withTargetAssignment(memberWithUnsubscribedTopics.memberEpoch(), targetAssignment) + .withHasSubscriptionChanged(true) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + } +}