KAFKA-16615; JoinGroup API for upgrading ConsumerGroup (#15798)

The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Dongnuo Lyu 2024-05-07 02:59:10 -04:00 committed by GitHub
parent d76352e215
commit 459eaec666
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1869 additions and 304 deletions

View File

@ -54,16 +54,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
val groupId = "grp"
val (memberId, _) = joinDynamicConsumerGroupWithOldProtocol(groupId)
// The joining request from a consumer group member is rejected.
val responseData = consumerGroupHeartbeat(
groupId = groupId,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
expectedError = Errors.GROUP_ID_NOT_FOUND
)
assertEquals("Group grp is not a consumer group.", responseData.errorMessage)
// The member leaves the group.
leaveGroup(
groupId = groupId,
@ -133,10 +123,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
val groupId = "grp"
val (memberId, _) = joinConsumerGroupWithNewProtocol(groupId)
// The joining request from a classic group member is rejected.
val joinGroupResponseData = sendJoinRequest(groupId = groupId)
assertEquals(Errors.GROUP_ID_NOT_FOUND.code, joinGroupResponseData.errorCode)
// The member leaves the group.
leaveGroup(
groupId = groupId,

View File

@ -327,6 +327,14 @@ public class GroupCoordinatorService implements GroupCoordinator {
);
}
if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs ||
request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs) {
return CompletableFuture.completedFuture(new JoinGroupResponseData()
.setMemberId(request.memberId())
.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
);
}
CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>();
runtime.scheduleWriteOperation(

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@ -86,11 +88,13 @@ import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -1174,6 +1178,64 @@ public class GroupMetadataManager {
}
}
/**
* Validates if the received classic member protocols are supported by the group.
*
* @param group The ConsumerGroup.
* @param memberId The joining member id.
* @param protocolType The joining member protocol type.
* @param protocols The joining member protocol collection.
*/
private void throwIfClassicProtocolIsNotSupported(
ConsumerGroup group,
String memberId,
String protocolType,
JoinGroupRequestProtocolCollection protocols
) {
if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) {
throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported.");
}
}
/**
* Deserialize the subscription in JoinGroupRequestProtocolCollection.
* All the protocols have the same subscription, so the method picks a random one.
*
* @param protocols The JoinGroupRequestProtocolCollection.
* @return The Subscription.
*/
private static ConsumerPartitionAssignor.Subscription deserializeSubscription(
JoinGroupRequestProtocolCollection protocols
) {
try {
return ConsumerProtocol.deserializeSubscription(
ByteBuffer.wrap(protocols.iterator().next().metadata())
);
} catch (SchemaException e) {
throw new IllegalStateException("Malformed embedded consumer protocol.");
}
}
/**
* @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list converted from the TopicPartitions list.
*/
private static List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPartitions(
List<TopicPartition> partitions,
TopicsImage topicsImage
) {
Map<Uuid, ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitionMap = new HashMap<>();
partitions.forEach(topicPartition -> {
TopicImage topicImage = topicsImage.getTopic(topicPartition.topic());
if (topicImage != null) {
topicPartitionMap
.computeIfAbsent(topicImage.id(), __ -> new ConsumerGroupHeartbeatRequestData.TopicPartitions().setTopicId(topicImage.id()))
.partitions()
.add(topicPartition.partition());
}
});
return new ArrayList<>(topicPartitionMap.values());
}
private ConsumerGroupHeartbeatResponseData.Assignment createResponseAssignment(
ConsumerGroupMember member
) {
@ -1266,8 +1328,11 @@ public class GroupMetadataManager {
staticMemberReplaced = true;
updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId)
.setAssignedPartitions(member.assignedPartitions());
removeMemberAndCancelTimers(records, group.groupId(), member.memberId());
log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId);
// Remove the member without canceling its timers in case the change is reverted. If the
// change is not reverted, the group validation will fail and the timer will do nothing.
removeMember(records, groupId, member.memberId());
log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " +
"Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId());
}
} else {
throwIfStaticMemberIsUnknown(member, instanceId);
@ -1293,24 +1358,15 @@ public class GroupMetadataManager {
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
.setClientId(clientId)
.setClientHost(clientHost)
.setClassicMemberMetadata(null)
.build();
boolean bumpGroupEpoch = false;
if (!updatedMember.equals(member)) {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.",
groupId, memberId, updatedMember.subscribedTopicNames());
bumpGroupEpoch = true;
}
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, updatedMember.subscribedTopicRegex());
bumpGroupEpoch = true;
}
}
boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
@ -1345,43 +1401,16 @@ public class GroupMetadataManager {
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
String preferredServerAssignor = group.computePreferredServerAssignor(
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
member,
updatedMember
).orElse(defaultAssignor.name());
try {
TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(group.targetAssignment())
.addOrUpdateMember(memberId, updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
// A new static member is replacing an older one with the same subscriptions.
// We just need to remove the older member and add the newer one. The new member should
// reuse the target assignment of the older member.
if (staticMemberReplaced) {
assignmentResult = assignmentResultBuilder
.removeMember(member.memberId())
.build();
} else {
assignmentResult = assignmentResultBuilder
.build();
}
log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.",
groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment());
records.addAll(assignmentResult.records());
targetAssignment = assignmentResult.targetAssignment().get(memberId);
targetAssignmentEpoch = groupEpoch;
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
log.error("[GroupId {}] {}.", groupId, msg);
throw new UnknownServerException(msg, ex);
}
updatedMember,
subscriptionMetadata,
staticMemberReplaced,
records
);
targetAssignmentEpoch = groupEpoch;
}
// 3. Reconcile the member's assignment with the target assignment if the member is not
@ -1418,6 +1447,227 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records, response);
}
/**
* Handle a JoinGroupRequest to a ConsumerGroup.
*
* @param group The group to join.
* @param context The request context.
* @param request The actual JoinGroup request.
* @param responseFuture The join group response future.
*
* @return The result that contains records to append if the join group phase completes.
*/
private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
ConsumerGroup group,
RequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) throws ApiException {
final long currentTimeMs = time.milliseconds();
final List<Record> records = new ArrayList<>();
final String groupId = request.groupId();
final String instanceId = request.groupInstanceId();
final JoinGroupRequestProtocolCollection protocols = request.protocols();
String memberId = request.memberId();
final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
if (isUnknownMember) memberId = Uuid.randomUuid().toString();
throwIfConsumerGroupIsFull(group, memberId);
throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols);
// Get or create the member.
ConsumerGroupMember member;
ConsumerGroupMember.Builder updatedMemberBuilder;
boolean staticMemberReplaced = false;
if (instanceId == null) {
// A dynamic member (re-)joins.
if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
// If member id required, send back a response to call for another join group request with allocated member id.
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
);
log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. " +
"Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId);
return EMPTY_RESULT;
} else {
member = group.getOrMaybeCreateMember(memberId, true);
log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
}
} else {
member = group.staticMember(instanceId);
// A new static member joins or the existing static member rejoins.
if (isUnknownMember) {
if (member == null) {
// New static member.
member = group.getOrMaybeCreateMember(memberId, true);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId);
} else {
// Replace the current static member.
staticMemberReplaced = true;
updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId)
.setAssignedPartitions(member.assignedPartitions());
// Remove the member without canceling its timers in case the change is reverted. If the
// change is not reverted, the group validation will fail and the timer will do nothing.
removeMember(records, groupId, member.memberId());
log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " +
"Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId());
}
} else {
// Rejoining static member. Fence the static group with unmatched member id.
throwIfStaticMemberIsUnknown(member, instanceId);
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId);
}
}
int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols);
final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions =
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics());
// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
// record is written to the __consumer_offsets partition to persist the change. If the subscriptions have
// changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue
// record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have
// changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition.
ConsumerGroupMember updatedMember = updatedMemberBuilder
.maybeUpdateInstanceId(Optional.ofNullable(instanceId))
.maybeUpdateRackId(subscription.rackId())
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
.maybeUpdateServerAssignorName(Optional.empty())
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
.setClientId(context.clientId())
.setClientHost(context.clientAddress.toString())
.setSupportedClassicProtocols(protocols)
.build();
boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
metadataImage.topics(),
metadataImage.cluster()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
bumpGroupEpoch = true;
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
member,
updatedMember,
subscriptionMetadata,
staticMemberReplaced,
records
);
targetAssignmentEpoch = groupEpoch;
}
// 3. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
ownedTopicPartitions,
records
);
final JoinGroupResponseData response = new JoinGroupResponseData()
.setMemberId(updatedMember.memberId())
.setGenerationId(updatedMember.memberEpoch())
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setProtocolName(protocols.iterator().next().name());
CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> {
if (t == null) {
scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), request.sessionTimeoutMs());
// The sync timeout ensures that the member send sync request within the rebalance timeout.
scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs());
responseFuture.complete(response);
}
});
return new CoordinatorResult<>(records, null, appendFuture, true);
}
/**
* Creates the member subscription record if the updatedMember is different from
* the old member. Returns true if the subscribedTopicNames/subscribedTopicRegex
* has changed.
*
* @param groupId The group id.
* @param member The old member.
* @param updatedMember The updated member.
* @param records The list to accumulate any new records.
* @return A boolean indicating whether the updatedMember has a different
* subscribedTopicNames/subscribedTopicRegex from the old member.
*/
private boolean hasMemberSubscriptionChanged(
String groupId,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
List<Record> records
) {
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.",
groupId, memberId, updatedMember.subscribedTopicNames());
return true;
}
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, updatedMember.subscribedTopicRegex());
return true;
}
}
return false;
}
/**
* Reconciles the current assignment of the member towards the target assignment if needed.
*
@ -1461,30 +1711,83 @@ public class GroupMetadataManager {
groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(),
assignmentToString(updatedMember.assignedPartitions()), assignmentToString(updatedMember.partitionsPendingRevocation()));
if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
scheduleConsumerGroupRebalanceTimeout(
groupId,
updatedMember.memberId(),
updatedMember.memberEpoch(),
updatedMember.rebalanceTimeoutMs()
);
} else {
cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId());
// Schedule/cancel the rebalance timeout if the member uses the consumer protocol.
// The members using classic protocol only have join timer and sync timer.
if (!updatedMember.useClassicProtocol()) {
if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
scheduleConsumerGroupRebalanceTimeout(
groupId,
updatedMember.memberId(),
updatedMember.memberEpoch(),
updatedMember.rebalanceTimeoutMs()
);
} else {
cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId());
}
}
}
return updatedMember;
}
private void removeMemberAndCancelTimers(
List<Record> records,
String groupId,
String memberId
/**
* Updates the target assignment according to the updated member and subscription metadata.
*
* @param group The ConsumerGroup.
* @param groupEpoch The group epoch.
* @param member The existing member.
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param staticMemberReplaced The boolean indicating whether the updated member
* is a static member that replaces the existing member.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
ConsumerGroup group,
int groupEpoch,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
Map<String, TopicMetadata> subscriptionMetadata,
boolean staticMemberReplaced,
List<Record> records
) {
// Write tombstones for the departed static member.
removeMember(records, groupId, memberId);
// Cancel all the timers of the departed static member.
cancelTimers(groupId, memberId);
String preferredServerAssignor = group.computePreferredServerAssignor(
member,
updatedMember
).orElse(defaultAssignor.name());
try {
TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(group.groupId(), groupEpoch, assignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(group.targetAssignment())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
// A new static member is replacing an older one with the same subscriptions.
// We just need to remove the older member and add the newer one. The new member should
// reuse the target assignment of the older member.
if (staticMemberReplaced) {
assignmentResult = assignmentResultBuilder
.removeMember(member.memberId())
.build();
} else {
assignmentResult = assignmentResultBuilder
.build();
}
log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.",
group.groupId(), groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment());
records.addAll(assignmentResult.records());
return assignmentResult.targetAssignment().get(updatedMember.memberId());
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
log.error("[GroupId {}] {}.", group.groupId(), msg);
throw new UnknownServerException(msg, ex);
}
}
/**
@ -1623,6 +1926,7 @@ public class GroupMetadataManager {
private void cancelTimers(String groupId, String memberId) {
cancelConsumerGroupSessionTimeout(groupId, memberId);
cancelConsumerGroupRebalanceTimeout(groupId, memberId);
cancelConsumerGroupSyncTimeout(groupId, memberId);
}
/**
@ -1634,9 +1938,24 @@ public class GroupMetadataManager {
private void scheduleConsumerGroupSessionTimeout(
String groupId,
String memberId
) {
scheduleConsumerGroupSessionTimeout(groupId, memberId, consumerGroupSessionTimeoutMs);
}
/**
* Schedules (or reschedules) the session timeout for the member.
*
* @param groupId The group id.
* @param memberId The member id.
* @param sessionTimeoutMs The session timeout.
*/
private void scheduleConsumerGroupSessionTimeout(
String groupId,
String memberId,
int sessionTimeoutMs
) {
String key = consumerGroupSessionTimeoutKey(groupId, memberId);
timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
try {
ConsumerGroup group = consumerGroup(groupId);
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
@ -1725,6 +2044,52 @@ public class GroupMetadataManager {
timer.cancel(consumerGroupRebalanceTimeoutKey(groupId, memberId));
}
/**
* Schedules a sync timeout for the member.
*
* @param groupId The group id.
* @param memberId The member id.
* @param rebalanceTimeoutMs The rebalance timeout.
*/
private void scheduleConsumerGroupSyncTimeout(
String groupId,
String memberId,
int rebalanceTimeoutMs
) {
String key = consumerGroupSyncKey(groupId, memberId);
timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
try {
ConsumerGroup group = consumerGroup(groupId);
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} fenced from the group because its session expired.",
groupId, memberId);
return consumerGroupFenceMember(group, member, null);
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {} because the group does not exist.",
groupId, memberId);
} catch (UnknownMemberIdException ex) {
log.debug("[GroupId {}] Could not fence {} because the member does not exist.",
groupId, memberId);
}
return new CoordinatorResult<>(Collections.emptyList());
});
}
/**
* Cancels the sync timeout of the member.
*
* @param groupId The group id.
* @param memberId The member id.
*/
private void cancelConsumerGroupSyncTimeout(
String groupId,
String memberId
) {
timer.cancel(consumerGroupSyncKey(groupId, memberId));
}
/**
* Handles a ConsumerGroupHeartbeat request.
*
@ -2230,81 +2595,95 @@ public class GroupMetadataManager {
RequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);
if (group != null && group.type() == CONSUMER && !group.isEmpty()) {
// classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups.
// The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup.
return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture);
} else {
return classicGroupJoinToClassicGroup(context, request, responseFuture);
}
}
/**
* Handle a JoinGroupRequest to a ClassicGroup or a group to be created.
*
* @param context The request context.
* @param request The actual JoinGroup request.
* @param responseFuture The join group response future.
*
* @return The result that contains records to append if the join group phase completes.
*/
CoordinatorResult<Void, Record> classicGroupJoinToClassicGroup(
RequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
CoordinatorResult<Void, Record> result = EMPTY_RESULT;
List<Record> records = new ArrayList<>();
String groupId = request.groupId();
String memberId = request.memberId();
int sessionTimeoutMs = request.sessionTimeoutMs();
if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
) {
boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
// Group is created if it does not exist and the member id is UNKNOWN. if member
// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND
ClassicGroup group;
maybeDeleteEmptyConsumerGroup(groupId, records);
boolean isNewGroup = !groups.containsKey(groupId);
try {
group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember);
} catch (Throwable t) {
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
.setErrorCode(Errors.forException(t).code())
);
return EMPTY_RESULT;
}
if (!acceptJoiningMember(group, memberId)) {
group.remove(memberId);
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(UNKNOWN_MEMBER_ID)
.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
);
} else if (isUnknownMember) {
result = classicGroupJoinNewMember(
context,
request,
group,
responseFuture
);
} else {
boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
// Group is created if it does not exist and the member id is UNKNOWN. if member
// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND
ClassicGroup group;
maybeDeleteEmptyConsumerGroup(groupId, records);
boolean isNewGroup = !groups.containsKey(groupId);
try {
group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember);
} catch (Throwable t) {
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(Errors.forException(t).code())
);
return EMPTY_RESULT;
}
result = classicGroupJoinExistingMember(
context,
request,
group,
responseFuture
);
}
if (!acceptJoiningMember(group, memberId)) {
group.remove(memberId);
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(UNKNOWN_MEMBER_ID)
.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
);
} else if (isUnknownMember) {
result = classicGroupJoinNewMember(
context,
request,
group,
responseFuture
);
} else {
result = classicGroupJoinExistingMember(
context,
request,
group,
responseFuture
);
}
if (isNewGroup && result == EMPTY_RESULT) {
// If there are no records to append and if a group was newly created, we need to append
// records to the log to commit the group to the timeline data structure.
CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> {
if (t != null) {
// We failed to write the empty group metadata. This will revert the snapshot, removing
// the newly created group.
log.warn("Failed to write empty metadata for group {}: {}", group.groupId(), t.getMessage());
if (isNewGroup && result == EMPTY_RESULT) {
// If there are no records to append and if a group was newly created, we need to append
// records to the log to commit the group to the timeline data structure.
CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> {
if (t != null) {
// We failed to write the empty group metadata. This will revert the snapshot, removing
// the newly created group.
log.warn("Failed to write empty metadata for group {}: {}", group.groupId(), t.getMessage());
responseFuture.complete(new JoinGroupResponseData()
.setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code()));
}
});
responseFuture.complete(new JoinGroupResponseData()
.setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code()));
}
});
records.add(
RecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion())
);
records.add(
RecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion())
);
return new CoordinatorResult<>(records, appendFuture, false);
}
return new CoordinatorResult<>(records, appendFuture, false);
}
return result;
}
@ -3975,4 +4354,18 @@ public class GroupMetadataManager {
static String classicGroupSyncKey(String groupId) {
return "sync-" + groupId;
}
/**
* Generate a consumer group sync key for the timer.
*
* Package private for testing.
*
* @param groupId The group id.
* @param memberId The member id.
*
* @return the sync key.
*/
static String consumerGroupSyncKey(String groupId, String memberId) {
return "sync-" + groupId + "-" + memberId;
}
}

View File

@ -1213,14 +1213,16 @@ public class ConsumerGroup implements Group {
* @return A boolean based on the condition mentioned above.
*/
public boolean supportsClassicProtocols(String memberProtocolType, Set<String> memberProtocols) {
if (isEmpty()) {
return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
} else {
return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
memberProtocols.stream().anyMatch(
if (ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType)) {
if (isEmpty()) {
return !memberProtocols.isEmpty();
} else {
return memberProtocols.stream().anyMatch(
name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == numClassicProtocolMembers()
);
}
}
return false;
}
/**

View File

@ -147,24 +147,10 @@ public class CurrentAssignmentBuilder {
// considered revoked when they are not anymore reported in the
// owned partitions set in the ConsumerGroupHeartbeat API.
// If the member does not provide its owned partitions. We cannot
// progress.
if (ownedTopicPartitions == null) {
return member;
}
// If the member provides its owned partitions. We verify if it still
// owns any of the revoked partitions. If it does, we cannot progress.
for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) {
for (Integer partitionId : topicPartitions.partitions()) {
boolean stillHasRevokedPartition = member
.partitionsPendingRevocation()
.getOrDefault(topicPartitions.topicId(), Collections.emptySet())
.contains(partitionId);
if (stillHasRevokedPartition) {
return member;
}
}
if (ownsRevokedPartitions(member.partitionsPendingRevocation())) {
return member;
}
// When the member has revoked all the pending partitions, it can
@ -203,6 +189,31 @@ public class CurrentAssignmentBuilder {
return member;
}
/**
* Decides whether the current ownedTopicPartitions contains any partition that is pending revocation.
*
* @param assignment The assignment that has the partitions pending revocation.
* @return A boolean based on the condition mentioned above.
*/
private boolean ownsRevokedPartitions(
Map<Uuid, Set<Integer>> assignment
) {
if (ownedTopicPartitions == null) return true;
for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) {
Set<Integer> partitionsPendingRevocation =
assignment.getOrDefault(topicPartitions.topicId(), Collections.emptySet());
for (Integer partitionId : topicPartitions.partitions()) {
if (partitionsPendingRevocation.contains(partitionId)) {
return true;
}
}
}
return false;
}
/**
* Computes the next assignment.
*
@ -257,7 +268,7 @@ public class CurrentAssignmentBuilder {
}
}
if (!newPartitionsPendingRevocation.isEmpty()) {
if (!newPartitionsPendingRevocation.isEmpty() && ownsRevokedPartitions(newPartitionsPendingRevocation)) {
// If there are partitions to be revoked, the member remains in its current
// epoch and requests the revocation of those partitions. It transitions to
// the UNREVOKED_PARTITIONS state to wait until the client acknowledges the

View File

@ -164,9 +164,9 @@ public class Assertions {
// The order of the racks stored in the PartitionMetadata of the ConsumerGroupPartitionMetadataValue
// is not always guaranteed. Therefore, we need a special comparator.
ConsumerGroupPartitionMetadataValue expectedValue =
(ConsumerGroupPartitionMetadataValue) expected.message();
(ConsumerGroupPartitionMetadataValue) expected.message().duplicate();
ConsumerGroupPartitionMetadataValue actualValue =
(ConsumerGroupPartitionMetadataValue) actual.message();
(ConsumerGroupPartitionMetadataValue) actual.message().duplicate();
List<ConsumerGroupPartitionMetadataValue.TopicMetadata> expectedTopicMetadataList =
expectedValue.topics();
@ -177,6 +177,9 @@ public class Assertions {
fail("Topic metadata lists have different sizes");
}
expectedTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
actualTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata =
expectedTopicMetadataList.get(i);

View File

@ -372,7 +372,8 @@ public class GroupCoordinatorServiceTest {
);
JoinGroupRequestData request = new JoinGroupRequestData()
.setGroupId("foo");
.setGroupId("foo")
.setSessionTimeoutMs(1000);
service.startup(() -> 1);
@ -405,7 +406,8 @@ public class GroupCoordinatorServiceTest {
);
JoinGroupRequestData request = new JoinGroupRequestData()
.setGroupId("foo");
.setGroupId("foo")
.setSessionTimeoutMs(1000);
service.startup(() -> 1);
@ -501,6 +503,38 @@ public class GroupCoordinatorServiceTest {
);
}
@ParameterizedTest
@ValueSource(ints = {120 - 1, 10 * 5 * 1000 + 1})
public void testJoinGroupInvalidSessionTimeout(int sessionTimeoutMs) throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorConfig config = createConfig();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
config,
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withSessionTimeoutMs(sessionTimeoutMs)
.build();
CompletableFuture<JoinGroupResponseData> future = service.joinGroup(
requestContext(ApiKeys.JOIN_GROUP),
request,
BufferSupplier.NO_CACHING
);
assertEquals(
new JoinGroupResponseData()
.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()),
future.get()
);
}
@Test
public void testSyncGroup() {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();

View File

@ -18,10 +18,12 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@ -87,6 +89,7 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESU
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSyncKey;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
@ -121,6 +124,34 @@ public class GroupMetadataManagerTestContext {
return protocols;
}
public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toConsumerProtocol(
List<String> topicNames,
List<TopicPartition> ownedPartitions
) {
return toConsumerProtocol(topicNames, ownedPartitions, ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
}
public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toConsumerProtocol(
List<String> topicNames,
List<TopicPartition> ownedPartitions,
short version
) {
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0);
protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("range")
.setMetadata(ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(
topicNames,
null,
ownedPartitions
),
version
).array())
);
return protocols;
}
public static Record newGroupMetadataRecord(
String groupId,
GroupMetadataValue value,
@ -586,6 +617,27 @@ public class GroupMetadataManagerTestContext {
assertNull(timeout);
}
public MockCoordinatorTimer.ScheduledTimeout<Void, Record> assertSyncTimeout(
String groupId,
String memberId,
long delayMs
) {
MockCoordinatorTimer.ScheduledTimeout<Void, Record> timeout =
timer.timeout(consumerGroupSyncKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
return timeout;
}
public void assertNoSyncTimeout(
String groupId,
String memberId
) {
MockCoordinatorTimer.ScheduledTimeout<Void, Record> timeout =
timer.timeout(consumerGroupSyncKey(groupId, memberId));
assertNull(timeout);
}
ClassicGroup createClassicGroup(String groupId) {
return groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, true);
}
@ -642,6 +694,10 @@ public class GroupMetadataManagerTestContext {
responseFuture
);
if (coordinatorResult.replayRecords()) {
coordinatorResult.records().forEach(this::replay);
}
return new JoinResult(responseFuture, coordinatorResult);
}
@ -789,6 +845,10 @@ public class GroupMetadataManagerTestContext {
responseFuture
);
if (coordinatorResult.replayRecords()) {
coordinatorResult.records().forEach(this::replay);
}
return new SyncResult(responseFuture, coordinatorResult);
}

View File

@ -16,12 +16,17 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class MockPartitionAssignor implements PartitionAssignor {
private final String name;
private GroupAssignment prepareGroupAssignment = null;
@ -43,4 +48,9 @@ public class MockPartitionAssignor implements PartitionAssignor {
public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
return prepareGroupAssignment;
}
public Map<Uuid, Set<Integer>> targetPartitions(String memberId) {
Objects.requireNonNull(prepareGroupAssignment);
return prepareGroupAssignment.members().get(memberId).targetPartitions();
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import java.util.Map;
import java.util.stream.Collectors;
public class NoOpPartitionAssignor implements PartitionAssignor {
static final String NAME = "no-op";
@Override
public String name() {
return NAME;
}
@Override
public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
return new GroupAssignment(assignmentSpec.members().entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new MemberAssignment(entry.getValue().assignedPartitions())
)));
}
}

View File

@ -170,6 +170,43 @@ public class CurrentAssignmentBuilderTest {
);
}
@Test
public void testStableToUnreleasedPartitionsWithOwnedPartitionsNotHavingRevokedPartitions() {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 7))))
.withCurrentPartitionEpoch((topicId, __) ->
topicId2.equals(topicId) ? 10 : -1
)
.withOwnedTopicPartitions(Collections.emptyList())
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5)))
.build(),
updatedMember
);
}
@Test
public void testUnrevokedPartitionsToStable() {
Uuid topicId1 = Uuid.randomUuid();