MINOR: Refactor GroupMetadataManager#consumerGroupHeartbeat and GroupMetadataManager#classicGroupJoinToConsumerGroup (#16371)

This patch is an attempt to simplifying GroupMetadataManager#consumerGroupHeartbeat and GroupMetadataManager#classicGroupJoinToConsumerGroup by sharing more of the common logic. It slightly change how static members are replaced too. Now, we generate the records to replace the member and then we update the member if needed.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-07-01 08:16:52 +02:00 committed by GitHub
parent 15a4501bde
commit 9a78122fb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 521 additions and 135 deletions

View File

@ -101,6 +101,30 @@ public class JoinGroupRequest extends AbstractRequest {
return apiVersion >= 4;
}
/**
* Since JoinGroupRequest version 4, a client that sends a join group request with
* {@link #UNKNOWN_MEMBER_ID} needs to rejoin with a new member id generated
* by the server. Once the second join group request is complete, the client is
* added as a new member of the group.
*
* Prior to version 4, a client is immediately added as a new member if it sends a
* join group request with UNKNOWN_MEMBER_ID.
*
* @param request The request.
* @param apiVersion The JoinGroupRequest api version.
*
* @return whether a known member id is required or not.
*/
public static boolean requiresKnownMemberId(
JoinGroupRequestData request,
short apiVersion
) {
return request.groupInstanceId() == null
&& request.memberId().equals(UNKNOWN_MEMBER_ID)
&& requiresKnownMemberId(apiVersion);
}
/**
* Starting from version 9 of the JoinGroup API, static members are able to
* skip running the assignor based on the `SkipAssignment` field. We leverage

View File

@ -1366,45 +1366,27 @@ public class GroupMetadataManager {
// Get or create the member.
if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
ConsumerGroupMember member;
ConsumerGroupMember.Builder updatedMemberBuilder;
boolean staticMemberReplaced = false;
final ConsumerGroupMember member;
if (instanceId == null) {
member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
if (createIfNotExists) {
log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId);
}
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
member = getOrMaybeSubscribeDynamicConsumerGroupMember(
group,
memberId,
memberEpoch,
ownedTopicPartitions,
createIfNotExists,
false
);
} else {
member = group.staticMember(instanceId);
if (memberEpoch == 0) {
// A new static member joins or the existing static member rejoins.
if (member == null) {
// New static member.
member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId);
} else {
// Static member rejoins with a different member id so it should replace
// the previous instance iff the previous member had sent a leave group.
throwIfInstanceIdIsUnreleased(member, groupId, memberId, instanceId);
// Replace the current member.
staticMemberReplaced = true;
updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId)
.setAssignedPartitions(member.assignedPartitions());
// Remove the member without canceling its timers in case the change is reverted. If the
// change is not reverted, the group validation will fail and the timer will do nothing.
removeMember(records, groupId, member.memberId());
log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " +
"Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId());
}
} else {
throwIfStaticMemberIsUnknown(member, instanceId);
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
}
member = getOrMaybeSubscribeStaticConsumerGroupMember(
group,
memberId,
memberEpoch,
instanceId,
ownedTopicPartitions,
createIfNotExists,
false,
records
);
}
// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
@ -1412,7 +1394,7 @@ public class GroupMetadataManager {
// changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue
// record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have
// changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition.
ConsumerGroupMember updatedMember = updatedMemberBuilder
ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
.maybeUpdateInstanceId(Optional.ofNullable(instanceId))
.maybeUpdateRackId(Optional.ofNullable(rackId))
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
@ -1447,7 +1429,7 @@ public class GroupMetadataManager {
);
int numMembers = group.numMembers();
if (!group.hasMember(updatedMember.memberId()) && !staticMemberReplaced) {
if (!group.hasMember(updatedMember.memberId()) && !group.hasStaticMember(updatedMember.instanceId())) {
numMembers++;
}
@ -1473,11 +1455,12 @@ public class GroupMetadataManager {
group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch;
final Assignment targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
@ -1485,10 +1468,12 @@ public class GroupMetadataManager {
updatedMember,
subscriptionMetadata,
subscriptionType,
staticMemberReplaced,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
}
// 3. Reconcile the member's assignment with the target assignment if the member is not
@ -1555,53 +1540,40 @@ public class GroupMetadataManager {
throwIfConsumerGroupIsFull(group, memberId);
throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols);
if (JoinGroupRequest.requiresKnownMemberId(request, context.apiVersion())) {
// A dynamic member requiring a member id joins the group. Send back a response to call for another
// join group request with allocated member id.
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
);
log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. " +
"Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId);
return EMPTY_RESULT;
}
// Get or create the member.
ConsumerGroupMember member;
ConsumerGroupMember.Builder updatedMemberBuilder;
boolean staticMemberReplaced = false;
final ConsumerGroupMember member;
if (instanceId == null) {
// A dynamic member (re-)joins.
if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
// If member id required, send back a response to call for another join group request with allocated member id.
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
);
log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. " +
"Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId);
return EMPTY_RESULT;
} else {
member = group.getOrMaybeCreateMember(memberId, true);
log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
}
member = getOrMaybeSubscribeDynamicConsumerGroupMember(
group,
memberId,
-1,
Collections.emptyList(),
true,
true
);
} else {
member = group.staticMember(instanceId);
// A new static member joins or the existing static member rejoins.
if (isUnknownMember) {
if (member == null) {
// New static member.
member = group.getOrMaybeCreateMember(memberId, true);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId);
} else {
// Replace the current static member.
staticMemberReplaced = true;
updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId)
.setAssignedPartitions(member.assignedPartitions());
// Remove the member without canceling its timers in case the change is reverted. If the
// change is not reverted, the group validation will fail and the timer will do nothing.
removeMember(records, groupId, member.memberId());
log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " +
"Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId());
}
} else {
// Rejoining static member. Fence the static group with unmatched member id.
throwIfStaticMemberIsUnknown(member, instanceId);
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId);
}
member = getOrMaybeSubscribeStaticConsumerGroupMember(
group,
memberId,
-1,
instanceId,
Collections.emptyList(),
isUnknownMember,
true,
records
);
}
int groupEpoch = group.groupEpoch();
@ -1609,15 +1581,13 @@ public class GroupMetadataManager {
Map<String, Integer> subscribedTopicNamesMap = group.subscribedTopicNames();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions =
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics());
// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
// record is written to the __consumer_offsets partition to persist the change. If the subscriptions have
// changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue
// record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have
// changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition.
ConsumerGroupMember updatedMember = updatedMemberBuilder
ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
.maybeUpdateInstanceId(Optional.ofNullable(instanceId))
.maybeUpdateRackId(Utils.toOptional(subscription.rackId()))
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
@ -1628,8 +1598,7 @@ public class GroupMetadataManager {
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(sessionTimeoutMs)
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))
)
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols)))
.build();
boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
@ -1651,7 +1620,7 @@ public class GroupMetadataManager {
);
int numMembers = group.numMembers();
if (!group.hasMember(updatedMember.memberId()) && !staticMemberReplaced) {
if (!group.hasMember(updatedMember.memberId()) && !group.hasStaticMember(updatedMember.instanceId())) {
numMembers++;
}
@ -1677,11 +1646,12 @@ public class GroupMetadataManager {
group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch;
final Assignment targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
@ -1689,10 +1659,13 @@ public class GroupMetadataManager {
updatedMember,
subscriptionMetadata,
subscriptionType,
staticMemberReplaced,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
}
// 3. Reconcile the member's assignment with the target assignment if the member is not
@ -1703,7 +1676,7 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
ownedTopicPartitions,
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
records
);
@ -1728,6 +1701,105 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records, null, appendFuture, true);
}
/**
* Gets or subscribes a new dynamic consumer group member.
*
* @param group The consumer group.
* @param memberId The member id.
* @param memberEpoch The member epoch.
* @param ownedTopicPartitions The owned partitions reported by the member.
* @param createIfNotExists Whether the member should be created or not.
* @param useClassicProtocol Whether the member uses the classic protocol.
*
* @return The existing consumer group member or a new one.
*/
private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
ConsumerGroup group,
String memberId,
int memberEpoch,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
boolean createIfNotExists,
boolean useClassicProtocol
) {
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
if (!useClassicProtocol) {
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
}
if (createIfNotExists) {
log.info("[GroupId {}] Member {} joins the consumer group using the {} protocol.",
group.groupId(), memberId, useClassicProtocol ? "classic" : "consumer");
}
return member;
}
/**
* Gets or subscribes a static consumer group member. This method also replaces the
* previous static member if allowed.
*
* @param group The consumer group.
* @param memberId The member id.
* @param memberEpoch The member epoch.
* @param instanceId The instance id.
* @param ownedTopicPartitions The owned partitions reported by the member.
* @param createIfNotExists Whether the member should be created or not.
* @param useClassicProtocol Whether the member uses the classic protocol.
* @param records The list to accumulate records created to replace
* the previous static member.
*
* @return The existing consumer group member or a new one.
*/
private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
ConsumerGroup group,
String memberId,
int memberEpoch,
String instanceId,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
boolean createIfNotExists,
boolean useClassicProtocol,
List<CoordinatorRecord> records
) {
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(instanceId);
if (createIfNotExists) {
// A new static member joins or the existing static member rejoins.
if (existingStaticMemberOrNull == null) {
// New static member.
ConsumerGroupMember newMember = group.getOrMaybeCreateMember(memberId, true);
log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group using the {} protocol.",
group.groupId(), memberId, instanceId, useClassicProtocol ? "classic" : "consumer");
return newMember;
} else {
if (!useClassicProtocol) {
// Static member rejoins with a different member id so it should replace
// the previous instance iff the previous member had sent a leave group.
throwIfInstanceIdIsUnreleased(existingStaticMemberOrNull, group.groupId(), memberId, instanceId);
}
// Copy the member but with its new member id.
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(existingStaticMemberOrNull, memberId)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.build();
// Generate the records to replace the member.
replaceMember(records, group, existingStaticMemberOrNull, newMember);
log.info("[GroupId {}] Static member with instance id {} re-joins the consumer group " +
"using the {} protocol. Created a new member {} to replace the existing member {}.",
group.groupId(), instanceId, useClassicProtocol ? "classic" : "consumer", memberId, existingStaticMemberOrNull.memberId());
return newMember;
}
} else {
throwIfStaticMemberIsUnknown(existingStaticMemberOrNull, instanceId);
throwIfInstanceIdIsFenced(existingStaticMemberOrNull, group.groupId(), memberId, instanceId);
if (!useClassicProtocol) {
throwIfMemberEpochIsInvalid(existingStaticMemberOrNull, memberEpoch, ownedTopicPartitions);
}
return existingStaticMemberOrNull;
}
}
/**
* Creates the member subscription record if the updatedMember is different from
* the old member. Returns true if the subscribedTopicNames/subscribedTopicRegex
@ -1836,8 +1908,6 @@ public class GroupMetadataManager {
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param subscriptionType The group subscription type.
* @param staticMemberReplaced The boolean indicating whether the updated member
* is a static member that replaces the existing member.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
@ -1848,7 +1918,6 @@ public class GroupMetadataManager {
ConsumerGroupMember updatedMember,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType,
boolean staticMemberReplaced,
List<CoordinatorRecord> records
) {
String preferredServerAssignor = group.computePreferredServerAssignor(
@ -1867,11 +1936,11 @@ public class GroupMetadataManager {
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
if (staticMemberReplaced) {
// A new static member is replacing an older one with the same subscriptions.
// We just need to remove the older member and add the newer one. The new member should
// reuse the target assignment of the older member.
assignmentResultBuilder.removeMember(member.memberId());
// If the instance id was associated to a different member, it means that the
// static member is replaced by the current member hence we remove the previous one.
String previousMemberId = group.staticMemberId(updatedMember.instanceId());
if (previousMemberId != null && !updatedMember.memberId().equals(previousMemberId)) {
assignmentResultBuilder.removeMember(previousMemberId);
}
long startTimeMs = time.milliseconds();
@ -2011,6 +2080,42 @@ public class GroupMetadataManager {
}
}
/**
* Write records to replace the old member by the new member.
*
* @param records The list of records to append to.
* @param group The consumer group.
* @param oldMember The old member.
* @param newMember The new member.
*/
private void replaceMember(
List<CoordinatorRecord> records,
ConsumerGroup group,
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
String groupId = group.groupId();
// Remove the member without canceling its timers in case the change is reverted. If the
// change is not reverted, the group validation will fail and the timer will do nothing.
removeMember(records, groupId, oldMember.memberId());
// Generate records.
records.add(CoordinatorRecordHelpers.newMemberSubscriptionRecord(
groupId,
newMember
));
records.add(CoordinatorRecordHelpers.newTargetAssignmentRecord(
groupId,
newMember.memberId(),
group.targetAssignment(oldMember.memberId()).partitions()
));
records.add(CoordinatorRecordHelpers.newCurrentAssignmentRecord(
groupId,
newMember
));
}
/**
* Write tombstones for the member. The order matters here.
*

View File

@ -199,6 +199,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
* @return The member id corresponding to the given instance id or null if it does not exist
*/
public String staticMemberId(String groupInstanceId) {
if (groupInstanceId == null) return null;
return staticMembers.get(groupInstanceId);
}
@ -240,6 +241,40 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false);
}
/**
* Returns true if the static member exists.
*
* @param instanceId The instance id.
*
* @return A boolean indicating whether the member exists or not.
*/
public boolean hasStaticMember(String instanceId) {
if (instanceId == null) return false;
return staticMembers.containsKey(instanceId);
}
/**
* Returns the target assignment associated to the provided member id if
* the instance id is null; otherwise returns the target assignment associated
* to the instance id.
*
* @param memberId The member id.
* @param instanceId The instance id.
*
* @return The Assignment or EMPTY if it does not exist.
*/
public Assignment targetAssignment(String memberId, String instanceId) {
if (instanceId == null) {
return targetAssignment(memberId);
} else {
String previousMemberId = staticMemberId(instanceId);
if (previousMemberId != null) {
return targetAssignment(previousMemberId);
}
}
return Assignment.EMPTY;
}
@Override
public void updateMember(ConsumerGroupMember newMember) {
if (newMember == null) {

View File

@ -73,9 +73,16 @@ public class ConsumerGroupMember extends ModernGroupMember {
}
public Builder(ConsumerGroupMember member) {
this(
Objects.requireNonNull(member),
member.memberId
);
}
public Builder(ConsumerGroupMember member, String newMemberId) {
Objects.requireNonNull(member);
this.memberId = member.memberId;
this.memberId = Objects.requireNonNull(newMemberId);
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
this.instanceId = member.instanceId;

View File

@ -997,6 +997,7 @@ public class GroupMetadataManagerTest {
.setInstanceId(memberId1)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRebalanceTimeoutMs(5000)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
@ -1046,22 +1047,6 @@ public class GroupMetadataManagerTest {
}))
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)
)));
// When the member rejoins, it gets the same assignments.
put(member2RejoinId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)
)));
}
}
));
// Member 2 leaves the consumer group.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
@ -1117,11 +1102,26 @@ public class GroupMetadataManagerTest {
rejoinResult.response()
);
ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(member2RejoinId)
.setState(MemberState.STABLE)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.setInstanceId(memberId2)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.build();
ConsumerGroupMember expectedRejoinedMember = new ConsumerGroupMember.Builder(member2RejoinId)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setInstanceId(memberId2)
.setPreviousMemberEpoch(0)
.setInstanceId(memberId2)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setRebalanceTimeoutMs(5000)
@ -1133,14 +1133,215 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecordsAfterRejoin = Arrays.asList(
// The previous member is deleted.
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedRejoinedMember),
// The previous member is replaced by the new one.
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedCopiedMember),
CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2))),
CoordinatorRecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, expectedCopiedMember),
// The new member is updated.
CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, expectedRejoinedMember)
);
assertRecordsEquals(expectedRecordsAfterRejoin, rejoinResult.records());
// Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId2);
context.assertNoRebalanceTimeout(groupId, memberId2);
}
@Test
public void testStaticMemberRejoinsWithNewSubscribedTopics() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
String member2RejoinId = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setInstanceId("instance-id-1")
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRebalanceTimeoutMs(5000)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.build();
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setInstanceId("instance-id-2")
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRebalanceTimeoutMs(5000)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build();
// Consumer group with two static members.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1)
.withMember(member2)
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.withAssignmentEpoch(10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
}
}))
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
put(member2RejoinId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)));
}
}
));
// Member 2 leaves the consumer group.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setInstanceId("instance-id-2")
.setMemberEpoch(-2));
// Member epoch of the response would be set to -2.
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(-2),
result.response()
);
// The departing static member will have it's epoch set to -2.
ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2)
.setMemberEpoch(-2)
.build();
assertEquals(1, result.records().size());
assertRecordEquals(result.records().get(0), CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch));
// Member 2 rejoins the group with the same instance id.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> rejoinResult = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(member2RejoinId)
.setGroupId(groupId)
.setInstanceId("instance-id-2")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setServerAssignor("range")
.setSubscribedTopicNames(Arrays.asList("foo", "bar")) // bar is new.
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(member2RejoinId)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(3, 4, 5)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(0, 1, 2))
))),
rejoinResult.response()
);
ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(member2RejoinId)
.setState(MemberState.STABLE)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.setInstanceId("instance-id-2")
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build();
ConsumerGroupMember expectedRejoinedMember = new ConsumerGroupMember.Builder(member2RejoinId)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(0)
.setInstanceId("instance-id-2")
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)))
.build();
List<CoordinatorRecord> expectedRecordsAfterRejoin = Arrays.asList(
// The previous member is deleted.
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2),
// The new member is created as a copy of the previous one but
// with its new member id and new epochs.
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedCopiedMember),
CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5))),
CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, expectedCopiedMember),
// As the new member as a different subscribed topic set, a rebalance is triggered.
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedRejoinedMember),
CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}),
CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11),
CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
CoordinatorRecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, expectedRejoinedMember)
);
@ -11240,6 +11441,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
.setRebalanceTimeoutMs(500)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)))
.build())
@ -11269,6 +11471,17 @@ public class GroupMetadataManagerTest {
String newMemberId = joinResult.joinFuture.get().memberId();
assertNotEquals("", newMemberId);
ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(newMemberId)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.setInstanceId(instanceId)
.setState(MemberState.STABLE)
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)))
.setRebalanceTimeoutMs(500)
.build();
ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
@ -11283,8 +11496,7 @@ public class GroupMetadataManagerTest {
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(request.sessionTimeoutMs())
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(request.protocols()))
)
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(request.protocols())))
.build();
List<CoordinatorRecord> expectedRecords = Arrays.asList(
@ -11293,10 +11505,13 @@ public class GroupMetadataManagerTest {
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId),
// Create the new static member.
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
// Replace the old static member by the new static member.
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedCopiedMember),
CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0, 1))),
CoordinatorRecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, expectedCopiedMember),
// Updated the new static member.
CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
);
assertRecordsEquals(expectedRecords, joinResult.records);