diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 789a3dfb281..03eddbe0ba4 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -54,16 +54,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord val groupId = "grp" val (memberId, _) = joinDynamicConsumerGroupWithOldProtocol(groupId) - // The joining request from a consumer group member is rejected. - val responseData = consumerGroupHeartbeat( - groupId = groupId, - rebalanceTimeoutMs = 5 * 60 * 1000, - subscribedTopicNames = List("foo"), - topicPartitions = List.empty, - expectedError = Errors.GROUP_ID_NOT_FOUND - ) - assertEquals("Group grp is not a consumer group.", responseData.errorMessage) - // The member leaves the group. leaveGroup( groupId = groupId, @@ -133,10 +123,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord val groupId = "grp" val (memberId, _) = joinConsumerGroupWithNewProtocol(groupId) - // The joining request from a classic group member is rejected. - val joinGroupResponseData = sendJoinRequest(groupId = groupId) - assertEquals(Errors.GROUP_ID_NOT_FOUND.code, joinGroupResponseData.errorCode) - // The member leaves the group. leaveGroup( groupId = groupId, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index fb4dc782b6c..66456202a1c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -327,6 +327,14 @@ public class GroupCoordinatorService implements GroupCoordinator { ); } + if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs || + request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs) { + return CompletableFuture.completedFuture(new JoinGroupResponseData() + .setMemberId(request.memberId()) + .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) + ); + } + CompletableFuture responseFuture = new CompletableFuture<>(); runtime.scheduleWriteOperation( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index adf776bde4b..4f4e9df528f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; @@ -86,11 +88,13 @@ import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; import org.slf4j.Logger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -1174,6 +1178,64 @@ public class GroupMetadataManager { } } + /** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ + private void throwIfClassicProtocolIsNotSupported( + ConsumerGroup group, + String memberId, + String protocolType, + JoinGroupRequestProtocolCollection protocols + ) { + if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); + } + } + + /** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ + private static ConsumerPartitionAssignor.Subscription deserializeSubscription( + JoinGroupRequestProtocolCollection protocols + ) { + try { + return ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(protocols.iterator().next().metadata()) + ); + } catch (SchemaException e) { + throw new IllegalStateException("Malformed embedded consumer protocol."); + } + } + + /** + * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list converted from the TopicPartitions list. + */ + private static List toTopicPartitions( + List partitions, + TopicsImage topicsImage + ) { + Map topicPartitionMap = new HashMap<>(); + partitions.forEach(topicPartition -> { + TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); + if (topicImage != null) { + topicPartitionMap + .computeIfAbsent(topicImage.id(), __ -> new ConsumerGroupHeartbeatRequestData.TopicPartitions().setTopicId(topicImage.id())) + .partitions() + .add(topicPartition.partition()); + } + }); + return new ArrayList<>(topicPartitionMap.values()); + } + private ConsumerGroupHeartbeatResponseData.Assignment createResponseAssignment( ConsumerGroupMember member ) { @@ -1266,8 +1328,11 @@ public class GroupMetadataManager { staticMemberReplaced = true; updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) .setAssignedPartitions(member.assignedPartitions()); - removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); - log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + // Remove the member without canceling its timers in case the change is reverted. If the + // change is not reverted, the group validation will fail and the timer will do nothing. + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); } } else { throwIfStaticMemberIsUnknown(member, instanceId); @@ -1293,24 +1358,15 @@ public class GroupMetadataManager { .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) .setClientId(clientId) .setClientHost(clientHost) + .setClassicMemberMetadata(null) .build(); - boolean bumpGroupEpoch = false; - if (!updatedMember.equals(member)) { - records.add(newMemberSubscriptionRecord(groupId, updatedMember)); - - if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { - log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", - groupId, memberId, updatedMember.subscribedTopicNames()); - bumpGroupEpoch = true; - } - - if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { - log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", - groupId, memberId, updatedMember.subscribedTopicRegex()); - bumpGroupEpoch = true; - } - } + boolean bumpGroupEpoch = hasMemberSubscriptionChanged( + groupId, + member, + updatedMember, + records + ); if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { // The subscription metadata is updated in two cases: @@ -1345,43 +1401,16 @@ public class GroupMetadataManager { int targetAssignmentEpoch = group.assignmentEpoch(); Assignment targetAssignment = group.targetAssignment(memberId); if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { - String preferredServerAssignor = group.computePreferredServerAssignor( + targetAssignment = updateTargetAssignment( + group, + groupEpoch, member, - updatedMember - ).orElse(defaultAssignor.name()); - try { - TargetAssignmentBuilder assignmentResultBuilder = - new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) - .withMembers(group.members()) - .withStaticMembers(group.staticMembers()) - .withSubscriptionMetadata(subscriptionMetadata) - .withTargetAssignment(group.targetAssignment()) - .addOrUpdateMember(memberId, updatedMember); - TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; - // A new static member is replacing an older one with the same subscriptions. - // We just need to remove the older member and add the newer one. The new member should - // reuse the target assignment of the older member. - if (staticMemberReplaced) { - assignmentResult = assignmentResultBuilder - .removeMember(member.memberId()) - .build(); - } else { - assignmentResult = assignmentResultBuilder - .build(); - } - - log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", - groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); - - records.addAll(assignmentResult.records()); - targetAssignment = assignmentResult.targetAssignment().get(memberId); - targetAssignmentEpoch = groupEpoch; - } catch (PartitionAssignorException ex) { - String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", - groupEpoch, ex.getMessage()); - log.error("[GroupId {}] {}.", groupId, msg); - throw new UnknownServerException(msg, ex); - } + updatedMember, + subscriptionMetadata, + staticMemberReplaced, + records + ); + targetAssignmentEpoch = groupEpoch; } // 3. Reconcile the member's assignment with the target assignment if the member is not @@ -1418,6 +1447,227 @@ public class GroupMetadataManager { return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List records = new ArrayList<>(); + final String groupId = request.groupId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + + String memberId = request.memberId(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + + // Get or create the member. + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { + // If member id required, send back a response to call for another join group request with allocated member id. + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(memberId) + .setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) + ); + log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. " + + "Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId); + return EMPTY_RESULT; + } else { + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + // Remove the member without canceling its timers in case the change is reverted. If the + // change is not reverted, the group validation will fail and the timer will do nothing. + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List ownedTopicPartitions = + toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = hasMemberSubscriptionChanged( + groupId, + member, + updatedMember, + records + ); + + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + targetAssignment = updateTargetAssignment( + group, + groupEpoch, + member, + updatedMember, + subscriptionMetadata, + staticMemberReplaced, + records + ); + targetAssignmentEpoch = groupEpoch; + } + + // 3. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + updatedMember = maybeReconcile( + groupId, + updatedMember, + group::currentPartitionEpoch, + targetAssignmentEpoch, + targetAssignment, + ownedTopicPartitions, + records + ); + + final JoinGroupResponseData response = new JoinGroupResponseData() + .setMemberId(updatedMember.memberId()) + .setGenerationId(updatedMember.memberEpoch()) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName(protocols.iterator().next().name()); + + CompletableFuture appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + if (t == null) { + scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), request.sessionTimeoutMs()); + // The sync timeout ensures that the member send sync request within the rebalance timeout. + scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs()); + + responseFuture.complete(response); + } + }); + + return new CoordinatorResult<>(records, null, appendFuture, true); + } + + /** + * Creates the member subscription record if the updatedMember is different from + * the old member. Returns true if the subscribedTopicNames/subscribedTopicRegex + * has changed. + * + * @param groupId The group id. + * @param member The old member. + * @param updatedMember The updated member. + * @param records The list to accumulate any new records. + * @return A boolean indicating whether the updatedMember has a different + * subscribedTopicNames/subscribedTopicRegex from the old member. + */ + private boolean hasMemberSubscriptionChanged( + String groupId, + ConsumerGroupMember member, + ConsumerGroupMember updatedMember, + List records + ) { + String memberId = updatedMember.memberId(); + if (!updatedMember.equals(member)) { + records.add(newMemberSubscriptionRecord(groupId, updatedMember)); + + if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { + log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", + groupId, memberId, updatedMember.subscribedTopicNames()); + return true; + } + + if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { + log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", + groupId, memberId, updatedMember.subscribedTopicRegex()); + return true; + } + } + return false; + } + /** * Reconciles the current assignment of the member towards the target assignment if needed. * @@ -1461,30 +1711,83 @@ public class GroupMetadataManager { groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), assignmentToString(updatedMember.assignedPartitions()), assignmentToString(updatedMember.partitionsPendingRevocation())); - if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { - scheduleConsumerGroupRebalanceTimeout( - groupId, - updatedMember.memberId(), - updatedMember.memberEpoch(), - updatedMember.rebalanceTimeoutMs() - ); - } else { - cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); + // Schedule/cancel the rebalance timeout if the member uses the consumer protocol. + // The members using classic protocol only have join timer and sync timer. + if (!updatedMember.useClassicProtocol()) { + if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { + scheduleConsumerGroupRebalanceTimeout( + groupId, + updatedMember.memberId(), + updatedMember.memberEpoch(), + updatedMember.rebalanceTimeoutMs() + ); + } else { + cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); + } } } return updatedMember; } - private void removeMemberAndCancelTimers( - List records, - String groupId, - String memberId + /** + * Updates the target assignment according to the updated member and subscription metadata. + * + * @param group The ConsumerGroup. + * @param groupEpoch The group epoch. + * @param member The existing member. + * @param updatedMember The updated member. + * @param subscriptionMetadata The subscription metadata. + * @param staticMemberReplaced The boolean indicating whether the updated member + * is a static member that replaces the existing member. + * @param records The list to accumulate any new records. + * @return The new target assignment. + */ + private Assignment updateTargetAssignment( + ConsumerGroup group, + int groupEpoch, + ConsumerGroupMember member, + ConsumerGroupMember updatedMember, + Map subscriptionMetadata, + boolean staticMemberReplaced, + List records ) { - // Write tombstones for the departed static member. - removeMember(records, groupId, memberId); - // Cancel all the timers of the departed static member. - cancelTimers(groupId, memberId); + String preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(group.groupId(), groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(updatedMember.memberId(), updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + group.groupId(), groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + return assignmentResult.targetAssignment().get(updatedMember.memberId()); + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", group.groupId(), msg); + throw new UnknownServerException(msg, ex); + } } /** @@ -1623,6 +1926,7 @@ public class GroupMetadataManager { private void cancelTimers(String groupId, String memberId) { cancelConsumerGroupSessionTimeout(groupId, memberId); cancelConsumerGroupRebalanceTimeout(groupId, memberId); + cancelConsumerGroupSyncTimeout(groupId, memberId); } /** @@ -1634,9 +1938,24 @@ public class GroupMetadataManager { private void scheduleConsumerGroupSessionTimeout( String groupId, String memberId + ) { + scheduleConsumerGroupSessionTimeout(groupId, memberId, consumerGroupSessionTimeoutMs); + } + + /** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param sessionTimeoutMs The session timeout. + */ + private void scheduleConsumerGroupSessionTimeout( + String groupId, + String memberId, + int sessionTimeoutMs ) { String key = consumerGroupSessionTimeoutKey(groupId, memberId); - timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { + timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = consumerGroup(groupId); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); @@ -1725,6 +2044,52 @@ public class GroupMetadataManager { timer.cancel(consumerGroupRebalanceTimeoutKey(groupId, memberId)); } + /** + * Schedules a sync timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param rebalanceTimeoutMs The rebalance timeout. + */ + private void scheduleConsumerGroupSyncTimeout( + String groupId, + String memberId, + int rebalanceTimeoutMs + ) { + String key = consumerGroupSyncKey(groupId, memberId); + timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { + try { + ConsumerGroup group = consumerGroup(groupId); + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + log.info("[GroupId {}] Member {} fenced from the group because its session expired.", + groupId, memberId); + + return consumerGroupFenceMember(group, member, null); + } catch (GroupIdNotFoundException ex) { + log.debug("[GroupId {}] Could not fence {} because the group does not exist.", + groupId, memberId); + } catch (UnknownMemberIdException ex) { + log.debug("[GroupId {}] Could not fence {} because the member does not exist.", + groupId, memberId); + } + + return new CoordinatorResult<>(Collections.emptyList()); + }); + } + + /** + * Cancels the sync timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ + private void cancelConsumerGroupSyncTimeout( + String groupId, + String memberId + ) { + timer.cancel(consumerGroupSyncKey(groupId, memberId)); + } + /** * Handles a ConsumerGroupHeartbeat request. * @@ -2230,81 +2595,95 @@ public class GroupMetadataManager { RequestContext context, JoinGroupRequestData request, CompletableFuture responseFuture + ) { + Group group = groups.get(request.groupId(), Long.MAX_VALUE); + if (group != null && group.type() == CONSUMER && !group.isEmpty()) { + // classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups. + // The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup. + return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); + } else { + return classicGroupJoinToClassicGroup(context, request, responseFuture); + } + } + + /** + * Handle a JoinGroupRequest to a ClassicGroup or a group to be created. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + CoordinatorResult classicGroupJoinToClassicGroup( + RequestContext context, + JoinGroupRequestData request, + CompletableFuture responseFuture ) { CoordinatorResult result = EMPTY_RESULT; List records = new ArrayList<>(); String groupId = request.groupId(); String memberId = request.memberId(); - int sessionTimeoutMs = request.sessionTimeoutMs(); - if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs || - sessionTimeoutMs > classicGroupMaxSessionTimeoutMs - ) { + boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + // Group is created if it does not exist and the member id is UNKNOWN. if member + // is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND + ClassicGroup group; + maybeDeleteEmptyConsumerGroup(groupId, records); + boolean isNewGroup = !groups.containsKey(groupId); + try { + group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember); + } catch (Throwable t) { responseFuture.complete(new JoinGroupResponseData() .setMemberId(memberId) - .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) + .setErrorCode(Errors.forException(t).code()) + ); + return EMPTY_RESULT; + } + + if (!acceptJoiningMember(group, memberId)) { + group.remove(memberId); + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(UNKNOWN_MEMBER_ID) + .setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code()) + ); + } else if (isUnknownMember) { + result = classicGroupJoinNewMember( + context, + request, + group, + responseFuture ); } else { - boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); - // Group is created if it does not exist and the member id is UNKNOWN. if member - // is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND - ClassicGroup group; - maybeDeleteEmptyConsumerGroup(groupId, records); - boolean isNewGroup = !groups.containsKey(groupId); - try { - group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember); - } catch (Throwable t) { - responseFuture.complete(new JoinGroupResponseData() - .setMemberId(memberId) - .setErrorCode(Errors.forException(t).code()) - ); - return EMPTY_RESULT; - } + result = classicGroupJoinExistingMember( + context, + request, + group, + responseFuture + ); + } - if (!acceptJoiningMember(group, memberId)) { - group.remove(memberId); - responseFuture.complete(new JoinGroupResponseData() - .setMemberId(UNKNOWN_MEMBER_ID) - .setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code()) - ); - } else if (isUnknownMember) { - result = classicGroupJoinNewMember( - context, - request, - group, - responseFuture - ); - } else { - result = classicGroupJoinExistingMember( - context, - request, - group, - responseFuture - ); - } + if (isNewGroup && result == EMPTY_RESULT) { + // If there are no records to append and if a group was newly created, we need to append + // records to the log to commit the group to the timeline data structure. + CompletableFuture appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + if (t != null) { + // We failed to write the empty group metadata. This will revert the snapshot, removing + // the newly created group. + log.warn("Failed to write empty metadata for group {}: {}", group.groupId(), t.getMessage()); - if (isNewGroup && result == EMPTY_RESULT) { - // If there are no records to append and if a group was newly created, we need to append - // records to the log to commit the group to the timeline data structure. - CompletableFuture appendFuture = new CompletableFuture<>(); - appendFuture.whenComplete((__, t) -> { - if (t != null) { - // We failed to write the empty group metadata. This will revert the snapshot, removing - // the newly created group. - log.warn("Failed to write empty metadata for group {}: {}", group.groupId(), t.getMessage()); + responseFuture.complete(new JoinGroupResponseData() + .setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code())); + } + }); - responseFuture.complete(new JoinGroupResponseData() - .setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code())); - } - }); + records.add( + RecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion()) + ); - records.add( - RecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion()) - ); - - return new CoordinatorResult<>(records, appendFuture, false); - } + return new CoordinatorResult<>(records, appendFuture, false); } return result; } @@ -3975,4 +4354,18 @@ public class GroupMetadataManager { static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } + + /** + * Generate a consumer group sync key for the timer. + * + * Package private for testing. + * + * @param groupId The group id. + * @param memberId The member id. + * + * @return the sync key. + */ + static String consumerGroupSyncKey(String groupId, String memberId) { + return "sync-" + groupId + "-" + memberId; + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 20b4428fd90..0842ab520c3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -1213,14 +1213,16 @@ public class ConsumerGroup implements Group { * @return A boolean based on the condition mentioned above. */ public boolean supportsClassicProtocols(String memberProtocolType, Set memberProtocols) { - if (isEmpty()) { - return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty(); - } else { - return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) && - memberProtocols.stream().anyMatch( + if (ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType)) { + if (isEmpty()) { + return !memberProtocols.isEmpty(); + } else { + return memberProtocols.stream().anyMatch( name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == numClassicProtocolMembers() ); + } } + return false; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index f4306d95fcf..78ec0da23ec 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -147,24 +147,10 @@ public class CurrentAssignmentBuilder { // considered revoked when they are not anymore reported in the // owned partitions set in the ConsumerGroupHeartbeat API. - // If the member does not provide its owned partitions. We cannot - // progress. - if (ownedTopicPartitions == null) { - return member; - } - // If the member provides its owned partitions. We verify if it still // owns any of the revoked partitions. If it does, we cannot progress. - for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { - for (Integer partitionId : topicPartitions.partitions()) { - boolean stillHasRevokedPartition = member - .partitionsPendingRevocation() - .getOrDefault(topicPartitions.topicId(), Collections.emptySet()) - .contains(partitionId); - if (stillHasRevokedPartition) { - return member; - } - } + if (ownsRevokedPartitions(member.partitionsPendingRevocation())) { + return member; } // When the member has revoked all the pending partitions, it can @@ -203,6 +189,31 @@ public class CurrentAssignmentBuilder { return member; } + /** + * Decides whether the current ownedTopicPartitions contains any partition that is pending revocation. + * + * @param assignment The assignment that has the partitions pending revocation. + * @return A boolean based on the condition mentioned above. + */ + private boolean ownsRevokedPartitions( + Map> assignment + ) { + if (ownedTopicPartitions == null) return true; + + for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { + Set partitionsPendingRevocation = + assignment.getOrDefault(topicPartitions.topicId(), Collections.emptySet()); + + for (Integer partitionId : topicPartitions.partitions()) { + if (partitionsPendingRevocation.contains(partitionId)) { + return true; + } + } + } + + return false; + } + /** * Computes the next assignment. * @@ -257,7 +268,7 @@ public class CurrentAssignmentBuilder { } } - if (!newPartitionsPendingRevocation.isEmpty()) { + if (!newPartitionsPendingRevocation.isEmpty() && ownsRevokedPartitions(newPartitionsPendingRevocation)) { // If there are partitions to be revoked, the member remains in its current // epoch and requests the revocation of those partitions. It transitions to // the UNREVOKED_PARTITIONS state to wait until the client acknowledges the diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java index 9482f067a83..fdfe26ddec2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java @@ -164,9 +164,9 @@ public class Assertions { // The order of the racks stored in the PartitionMetadata of the ConsumerGroupPartitionMetadataValue // is not always guaranteed. Therefore, we need a special comparator. ConsumerGroupPartitionMetadataValue expectedValue = - (ConsumerGroupPartitionMetadataValue) expected.message(); + (ConsumerGroupPartitionMetadataValue) expected.message().duplicate(); ConsumerGroupPartitionMetadataValue actualValue = - (ConsumerGroupPartitionMetadataValue) actual.message(); + (ConsumerGroupPartitionMetadataValue) actual.message().duplicate(); List expectedTopicMetadataList = expectedValue.topics(); @@ -177,6 +177,9 @@ public class Assertions { fail("Topic metadata lists have different sizes"); } + expectedTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId)); + actualTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId)); + for (int i = 0; i < expectedTopicMetadataList.size(); i++) { ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = expectedTopicMetadataList.get(i); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 3c27cfd5466..2450d473f8b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -372,7 +372,8 @@ public class GroupCoordinatorServiceTest { ); JoinGroupRequestData request = new JoinGroupRequestData() - .setGroupId("foo"); + .setGroupId("foo") + .setSessionTimeoutMs(1000); service.startup(() -> 1); @@ -405,7 +406,8 @@ public class GroupCoordinatorServiceTest { ); JoinGroupRequestData request = new JoinGroupRequestData() - .setGroupId("foo"); + .setGroupId("foo") + .setSessionTimeoutMs(1000); service.startup(() -> 1); @@ -501,6 +503,38 @@ public class GroupCoordinatorServiceTest { ); } + @ParameterizedTest + @ValueSource(ints = {120 - 1, 10 * 5 * 1000 + 1}) + public void testJoinGroupInvalidSessionTimeout(int sessionTimeoutMs) throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorConfig config = createConfig(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + config, + runtime, + new GroupCoordinatorMetrics() + ); + service.startup(() -> 1); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(sessionTimeoutMs) + .build(); + + CompletableFuture future = service.joinGroup( + requestContext(ApiKeys.JOIN_GROUP), + request, + BufferSupplier.NO_CACHING + ); + + assertEquals( + new JoinGroupResponseData() + .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()), + future.get() + ); + } + @Test public void testSyncGroup() { CoordinatorRuntime runtime = mockRuntime(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 83460c4e0d7..1d90a25cc12 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.FencedMemberEpochException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupMaxSizeReachedException; import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.InconsistentGroupProtocolException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -36,6 +37,7 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.errors.UnreleasedInstanceIdException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -298,20 +300,17 @@ public class GroupMetadataManagerTest { // Use a static member id as it makes the test easier. String memberId = Uuid.randomUuid().toString(); - MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .build(); - assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); - // A first member joins to create the group. context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId) .setMemberEpoch(0) - .setServerAssignor("range") + .setServerAssignor(NoOpPartitionAssignor.NAME) .setRebalanceTimeoutMs(5000) .setSubscribedTopicNames(Arrays.asList("foo", "bar")) .setTopicPartitions(Collections.emptyList())); @@ -3875,45 +3874,6 @@ public class GroupMetadataManagerTest { .forEach(memberId -> assertTrue(group.hasMemberId(memberId))); } - @Test - public void testJoinGroupSessionTimeoutTooSmall() throws Exception { - int minSessionTimeout = 50; - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withClassicGroupMinSessionTimeoutMs(minSessionTimeout) - .build(); - - JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() - .withGroupId("group-id") - .withMemberId(UNKNOWN_MEMBER_ID) - .withSessionTimeoutMs(minSessionTimeout - 1) - .build(); - - GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); - assertTrue(joinResult.joinFuture.isDone()); - assertTrue(joinResult.records.isEmpty()); - assertEquals(Errors.INVALID_SESSION_TIMEOUT.code(), joinResult.joinFuture.get().errorCode()); - } - - @Test - public void testJoinGroupSessionTimeoutTooLarge() throws Exception { - int maxSessionTimeout = 50; - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) - .build(); - - JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() - .withGroupId("group-id") - .withMemberId(UNKNOWN_MEMBER_ID) - .withSessionTimeoutMs(maxSessionTimeout + 1) - .build(); - - GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); - - assertTrue(joinResult.records.isEmpty()); - assertTrue(joinResult.joinFuture.isDone()); - assertEquals(Errors.INVALID_SESSION_TIMEOUT.code(), joinResult.joinFuture.get().errorCode()); - } - @Test public void testJoinGroupUnknownMemberNewGroup() throws Exception { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -9390,10 +9350,8 @@ public class GroupMetadataManagerTest { public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { String classicGroupId = "classic-group-id"; String memberId = Uuid.randomUuid().toString(); - MockPartitionAssignor assignor = new MockPartitionAssignor("range"); - assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .build(); ClassicGroup classicGroup = new ClassicGroup( new LogContext(), @@ -9411,7 +9369,7 @@ public class GroupMetadataManagerTest { .setGroupId(classicGroupId) .setMemberId(memberId) .setMemberEpoch(0) - .setServerAssignor("range") + .setServerAssignor(NoOpPartitionAssignor.NAME) .setRebalanceTimeoutMs(5000) .setSubscribedTopicNames(Arrays.asList("foo", "bar")) .setTopicPartitions(Collections.emptyList()))); @@ -9421,10 +9379,8 @@ public class GroupMetadataManagerTest { public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { String classicGroupId = "classic-group-id"; String memberId = Uuid.randomUuid().toString(); - MockPartitionAssignor assignor = new MockPartitionAssignor("range"); - assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .build(); ClassicGroup classicGroup = new ClassicGroup( new LogContext(), @@ -9440,7 +9396,7 @@ public class GroupMetadataManagerTest { .setGroupId(classicGroupId) .setMemberId(memberId) .setMemberEpoch(0) - .setServerAssignor("range") + .setServerAssignor(NoOpPartitionAssignor.NAME) .setRebalanceTimeoutMs(5000) .setSubscribedTopicNames(Arrays.asList("foo", "bar")) .setTopicPartitions(Collections.emptyList())); @@ -9453,7 +9409,7 @@ public class GroupMetadataManagerTest { .setClientId("client") .setClientHost("localhost/127.0.0.1") .setSubscribedTopicNames(Arrays.asList("foo", "bar")) - .setServerAssignorName("range") + .setServerAssignorName(NoOpPartitionAssignor.NAME) .setAssignedPartitions(Collections.emptyMap()) .build(); @@ -9475,29 +9431,6 @@ public class GroupMetadataManagerTest { ); } - @Test - public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws Exception { - String consumerGroupId = "consumer-group-id"; - String memberId = Uuid.randomUuid().toString(); - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10) - .withMember(new ConsumerGroupMember.Builder(memberId) - .setState(MemberState.STABLE) - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) - .build())) - .build(); - - JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() - .withGroupId(consumerGroupId) - .withMemberId(UNKNOWN_MEMBER_ID) - .withDefaultProtocolTypeAndProtocols() - .build(); - - GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); - assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), joinResult.joinFuture.get().errorCode()); - } - @Test public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { String consumerGroupId = "consumer-group-id"; @@ -9630,12 +9563,9 @@ public class GroupMetadataManagerTest { .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) .setRebalanceTimeoutMs(10000) .setSupportedClassicProtocols(protocols) - .setAssignedPartitions(new HashMap>() { - { - put(fooTopicId, new HashSet<>(Collections.singletonList(0))); - put(barTopicId, new HashSet<>(Collections.singletonList(0))); - } - }) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) .build(); ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) @@ -9682,16 +9612,8 @@ public class GroupMetadataManagerTest { // Newly joining member 2 bumps the group epoch. A new target assignment is computed. RecordHelpers.newGroupEpochRecord(groupId, 1), - RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, new HashMap>() { - { - put(barTopicId, new HashSet<>(Collections.singletonList(0))); - } - }), - RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, new HashMap>() { - { - put(fooTopicId, new HashSet<>(Collections.singletonList(0))); - } - }), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), RecordHelpers.newTargetAssignmentEpochRecord(groupId, 1), // Member 2 has no pending revoking partition. Bump its member epoch and transition to UNRELEASED_PARTITIONS. @@ -9859,11 +9781,8 @@ public class GroupMetadataManagerTest { .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) .setRebalanceTimeoutMs(10000) .setSupportedClassicProtocols(protocols1) - .setAssignedPartitions(new HashMap>() { - { - put(fooTopicId, new HashSet<>(Arrays.asList(0, 1))); - } - }) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) .build(); ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) @@ -9874,11 +9793,8 @@ public class GroupMetadataManagerTest { .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) .setRebalanceTimeoutMs(10000) .setSupportedClassicProtocols(protocols2) - .setAssignedPartitions(new HashMap>() { - { - put(barTopicId, new HashSet<>(Collections.singletonList(0))); - } - }) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(barTopicId, 0))) .build(); ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) @@ -9932,16 +9848,8 @@ public class GroupMetadataManagerTest { // Newly joining member 3 bumps the group epoch. A new target assignment is computed. RecordHelpers.newGroupEpochRecord(groupId, 1), - RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, new HashMap>() { - { - put(fooTopicId, new HashSet<>(Collections.singletonList(0))); - } - }), - RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, new HashMap>() { - { - put(fooTopicId, new HashSet<>(Collections.singletonList(1))); - } - }), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), RecordHelpers.newTargetAssignmentEpochRecord(groupId, 1), // Member 3 has no pending revoking partition. Bump its member epoch and transition to UNRELEASED_PARTITIONS. @@ -10130,11 +10038,8 @@ public class GroupMetadataManagerTest { .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) .setRebalanceTimeoutMs(10000) .setSupportedClassicProtocols(protocols1) - .setAssignedPartitions(new HashMap>() { - { - put(fooTopicId, new HashSet<>(Arrays.asList(0, 1))); - } - }) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) .build(); ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) @@ -10145,11 +10050,8 @@ public class GroupMetadataManagerTest { .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) .setRebalanceTimeoutMs(10000) .setSupportedClassicProtocols(protocols2) - .setAssignedPartitions(new HashMap>() { - { - put(barTopicId, new HashSet<>(Collections.singletonList(0))); - } - }) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(barTopicId, 0))) .build(); ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) @@ -10203,16 +10105,8 @@ public class GroupMetadataManagerTest { // Newly joining member 3 bumps the group epoch. A new target assignment is computed. RecordHelpers.newGroupEpochRecord(groupId, 2), - RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, new HashMap>() { - { - put(fooTopicId, new HashSet<>(Collections.singletonList(0))); - } - }), - RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, new HashMap>() { - { - put(fooTopicId, new HashSet<>(Collections.singletonList(1))); - } - }), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), RecordHelpers.newTargetAssignmentEpochRecord(groupId, 2), // Member 3 has no pending revoking partition. Bump its member epoch and transition to UNRELEASED_PARTITIONS. @@ -10921,6 +10815,1088 @@ public class GroupMetadataManagerTest { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + // Send second join group request for a new dynamic member with the new member id. + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + } + + @Test + public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String memberId = Uuid.randomUuid().toString(); + String newMemberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.commit(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(newMemberId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a failed write to the log. + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + assertFalse(context.groupMetadataManager.consumerGroup(groupId).hasMember(newMemberId)); + } + + @Test + public void testJoiningConsumerGroupWithNewStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List expectedRecords = Arrays.asList( + // Remove the old static member. + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // Create the new static member. + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0, 1))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupWithExistingStaticMemberAndNewSubscription() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0), + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setState(MemberState.STABLE) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName, zarTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + + List expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0), + mkTopicAssignment(fooTopicId, 1))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 2), joinResult.records.subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), joinResult.records.subList(2, 4)); + assertRecordsEquals(expectedRecords.subList(4, 6), joinResult.records.subList(4, 6)); + + joinResult.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testStaticMemberJoiningConsumerGroupWithUnknownInstanceId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + // Set up a ConsumerGroup with no static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with an instance id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testStaticMemberJoiningConsumerGroupWithUnmatchedMemberId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with the same instance id and a different member id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testReconciliationInJoiningConsumerGroupWithEagerProtocol() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + // Prepare the new target assignment. + // Member 1 will need to revoke bar-0, and member 2 will need to revoke foo-1. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list and an empty owned + // partition, and transitions to UNRELEASED_PARTITIONS. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult1 = context.sendClassicGroupJoin(request); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setState(MemberState.UNRELEASED_PARTITIONS) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName, zarTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + + List expectedRecords1 = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember1), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0))), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember1) + ); + assertEquals(expectedRecords1.size(), joinResult1.records.size()); + assertRecordsEquals(expectedRecords1.subList(0, 3), joinResult1.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords1.subList(3, 5), joinResult1.records.subList(3, 5)); + assertRecordsEquals(expectedRecords1.subList(5, 7), joinResult1.records.subList(5, 7)); + + assertEquals(expectedMember1.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult1.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult1.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + + // Member 2 heartbeats to confirm revoking foo-1. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setTopicPartitions(Collections.emptyList()) + ); + + // Member 1 rejoins to transition from UNRELEASED_PARTITIONS to STABLE. + GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request); + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(expectedMember1) + .setState(MemberState.STABLE) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0))) + .build(); + + assertRecordsEquals( + Collections.singletonList(RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember2)), + joinResult2.records + ); + assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult2.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult2.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + // Prepare the new target assignment. + // Member 1 will need to revoke bar-0, and member 2 will need to revoke foo-1. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list and transitions to UNREVOKED_PARTITIONS. + JoinGroupRequestData request1 = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult1 = context.sendClassicGroupJoin(request1); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName, zarTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build(); + + List expectedRecords1 = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember1), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0))), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember1) + ); + assertEquals(expectedRecords1.size(), joinResult1.records.size()); + assertRecordsEquals(expectedRecords1.subList(0, 3), joinResult1.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords1.subList(3, 5), joinResult1.records.subList(3, 5)); + assertRecordsEquals(expectedRecords1.subList(5, 7), joinResult1.records.subList(5, 7)); + + assertEquals(expectedMember1.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult1.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult1.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request1.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request1.rebalanceTimeoutMs()); + + // Member 1 rejoins to transition from UNREVOKED_PARTITIONS to UNRELEASED_PARTITIONS. + JoinGroupRequestData request2 = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.singletonList(new TopicPartition(fooTopicName, 0)))) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request2); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(expectedMember1) + .setMemberEpoch(11) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setPartitionsPendingRevocation(Collections.emptyMap()) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.singletonList(new TopicPartition(fooTopicName, 0)))) + .build(); + + assertRecordsEquals( + Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember2), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember2) + ), + joinResult2.records + ); + assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult2.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult2.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request2.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request2.rebalanceTimeoutMs()); + + // Member 2 heartbeats to confirm revoking foo-1. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setTopicPartitions(Collections.emptyList()) + ); + + // Member 1 rejoins to transition from UNRELEASED_PARTITIONS to STABLE. + JoinGroupRequestData request3 = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(zarTopicName, 0)))) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult3 = context.sendClassicGroupJoin(request3); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(expectedMember2) + .setState(MemberState.STABLE) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(zarTopicName, 0)))) + .build(); + + assertRecordsEquals( + Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ), + joinResult3.records + ); + assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult3.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult3.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request3.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request3.rebalanceTimeoutMs()); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 1d94fe485f8..e5932935f2e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -18,10 +18,12 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -87,6 +89,7 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESU import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey; +import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSyncKey; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY; @@ -121,6 +124,34 @@ public class GroupMetadataManagerTestContext { return protocols; } + public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toConsumerProtocol( + List topicNames, + List ownedPartitions + ) { + return toConsumerProtocol(topicNames, ownedPartitions, ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION); + } + + public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toConsumerProtocol( + List topicNames, + List ownedPartitions, + short version + ) { + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0); + protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + topicNames, + null, + ownedPartitions + ), + version + ).array()) + ); + return protocols; + } + public static Record newGroupMetadataRecord( String groupId, GroupMetadataValue value, @@ -586,6 +617,27 @@ public class GroupMetadataManagerTestContext { assertNull(timeout); } + public MockCoordinatorTimer.ScheduledTimeout assertSyncTimeout( + String groupId, + String memberId, + long delayMs + ) { + MockCoordinatorTimer.ScheduledTimeout timeout = + timer.timeout(consumerGroupSyncKey(groupId, memberId)); + assertNotNull(timeout); + assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs); + return timeout; + } + + public void assertNoSyncTimeout( + String groupId, + String memberId + ) { + MockCoordinatorTimer.ScheduledTimeout timeout = + timer.timeout(consumerGroupSyncKey(groupId, memberId)); + assertNull(timeout); + } + ClassicGroup createClassicGroup(String groupId) { return groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, true); } @@ -642,6 +694,10 @@ public class GroupMetadataManagerTestContext { responseFuture ); + if (coordinatorResult.replayRecords()) { + coordinatorResult.records().forEach(this::replay); + } + return new JoinResult(responseFuture, coordinatorResult); } @@ -789,6 +845,10 @@ public class GroupMetadataManagerTestContext { responseFuture ); + if (coordinatorResult.replayRecords()) { + coordinatorResult.records().forEach(this::replay); + } + return new SyncResult(responseFuture, coordinatorResult); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java index 18bd6e51f22..736ee4395a5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java @@ -16,12 +16,17 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + public class MockPartitionAssignor implements PartitionAssignor { private final String name; private GroupAssignment prepareGroupAssignment = null; @@ -43,4 +48,9 @@ public class MockPartitionAssignor implements PartitionAssignor { public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { return prepareGroupAssignment; } + + public Map> targetPartitions(String memberId) { + Objects.requireNonNull(prepareGroupAssignment); + return prepareGroupAssignment.members().get(memberId).targetPartitions(); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java new file mode 100644 index 00000000000..05b8cdc4c76 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Map; +import java.util.stream.Collectors; + +public class NoOpPartitionAssignor implements PartitionAssignor { + static final String NAME = "no-op"; + + @Override + public String name() { + return NAME; + } + + @Override + public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + return new GroupAssignment(assignmentSpec.members().entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> new MemberAssignment(entry.getValue().assignedPartitions()) + ))); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java index c1db6f22890..0634ea77e10 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java @@ -170,6 +170,43 @@ public class CurrentAssignmentBuilderTest { ); } + @Test + public void testStableToUnreleasedPartitionsWithOwnedPartitionsNotHavingRevokedPartitions() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 7)))) + .withCurrentPartitionEpoch((topicId, __) -> + topicId2.equals(topicId) ? 10 : -1 + ) + .withOwnedTopicPartitions(Collections.emptyList()) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5))) + .build(), + updatedMember + ); + } + @Test public void testUnrevokedPartitionsToStable() { Uuid topicId1 = Uuid.randomUuid();