KAFKA-16554: Online downgrade triggering and group type conversion (#15721)

Online downgrade from a consumer group to a classic group is triggered when the last consumer that uses the consumer protocol leaves the group. A rebalance is manually triggered after the group conversion. This patch adds consumer group downgrade validation and conversion.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Dongnuo Lyu 2024-04-25 10:44:25 -04:00 committed by GitHub
parent bed23b7978
commit dc1d8fc330
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 932 additions and 47 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
@ -777,11 +778,87 @@ public class GroupMetadataManager {
}
}
/**
* Validates the online downgrade if a consumer member is fenced from the consumer group.
*
* @param consumerGroup The ConsumerGroup.
* @param memberId The fenced member id.
* @return A boolean indicating whether it's valid to online downgrade the consumer group.
*/
private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) {
if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
consumerGroup.groupId());
return false;
} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.",
consumerGroup.groupId());
return false;
} else if (consumerGroup.numMembers() <= 1) {
log.debug("Skip downgrading the consumer group {} to classic group because it's empty.",
consumerGroup.groupId());
return false;
} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.",
consumerGroup.groupId());
}
return true;
}
/**
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
*
* @param consumerGroup The converted ConsumerGroup.
* @param leavingMemberId The leaving member that triggers the downgrade validation.
* @param records The list of Records.
* @return An appendFuture of the conversion.
*/
private CompletableFuture<Void> convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List<Record> records) {
consumerGroup.createGroupTombstoneRecords(records);
ClassicGroup classicGroup;
try {
classicGroup = ClassicGroup.fromConsumerGroup(
consumerGroup,
leavingMemberId,
logContext,
time,
metrics,
consumerGroupSessionTimeoutMs,
metadataImage
);
} catch (SchemaException e) {
log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " +
"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e);
throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.",
consumerGroup.groupId(), e.getMessage()));
}
classicGroup.createClassicGroupRecords(metadataImage.features().metadataVersion(), records);
// Directly update the states instead of replaying the records because
// the classicGroup reference is needed for triggering the rebalance.
// Set the appendFuture to prevent the records from being replayed.
removeGroup(consumerGroup.groupId());
groups.put(consumerGroup.groupId(), classicGroup);
metrics.onClassicGroupStateTransition(null, classicGroup.currentState());
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.exceptionally(__ -> {
metrics.onClassicGroupStateTransition(classicGroup.currentState(), null);
return null;
});
return appendFuture;
}
/**
* Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request.
*
* @param classicGroup A ClassicGroup.
* @return The boolean indicating whether it's valid to online upgrade the classic group.
* @return A boolean indicating whether it's valid to online upgrade the classic group.
*/
private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
@ -1421,11 +1498,12 @@ public class GroupMetadataManager {
int memberEpoch
) throws ApiException {
ConsumerGroup group = consumerGroup(groupId);
List<Record> records;
List<Record> records = new ArrayList<>();
CompletableFuture<Void> appendFuture = null;
if (instanceId == null) {
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId);
records = consumerGroupFenceMember(group, member);
appendFuture = consumerGroupFenceMember(group, member, records);
} else {
ConsumerGroupMember member = group.staticMember(instanceId);
throwIfStaticMemberIsUnknown(member, instanceId);
@ -1437,12 +1515,17 @@ public class GroupMetadataManager {
} else {
log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group.",
group.groupId(), memberId, instanceId);
records = consumerGroupFenceMember(group, member);
appendFuture = consumerGroupFenceMember(group, member, records);
}
}
return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch));
return new CoordinatorResult<>(
records,
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch),
appendFuture
);
}
/**
@ -1470,42 +1553,45 @@ public class GroupMetadataManager {
}
/**
* Fences a member from a consumer group.
* Fences a member from a consumer group and maybe downgrade the consumer group to a classic group.
*
* @param group The group.
* @param member The member.
*
* @return A list of records to be applied to the state.
* @param group The group.
* @param member The member.
* @param records The list of records to be applied to the state.
* @return The append future to be applied.
*/
private List<Record> consumerGroupFenceMember(
private CompletableFuture<Void> consumerGroupFenceMember(
ConsumerGroup group,
ConsumerGroupMember member
ConsumerGroupMember member,
List<Record> records
) {
List<Record> records = new ArrayList<>();
if (validateOnlineDowngrade(group, member.memberId())) {
return convertToClassicGroup(group, member.memberId(), records);
} else {
removeMember(records, group.groupId(), member.memberId());
removeMember(records, group.groupId(), member.memberId());
// We update the subscription metadata without the leaving member.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
metadataImage.topics(),
metadataImage.cluster()
);
// We update the subscription metadata without the leaving member.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
metadataImage.topics(),
metadataImage.cluster()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
}
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
cancelTimers(group.groupId(), member.memberId());
return null;
}
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
cancelTimers(group.groupId(), member.memberId());
return records;
}
/**
@ -1549,7 +1635,10 @@ public class GroupMetadataManager {
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} fenced from the group because its session expired.",
groupId, memberId);
return new CoordinatorResult<>(consumerGroupFenceMember(group, member));
List<Record> records = new ArrayList<>();
CompletableFuture<Void> appendFuture = consumerGroupFenceMember(group, member, records);
return new CoordinatorResult<>(records, appendFuture);
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {} because the group does not exist.",
groupId, memberId);
@ -1599,7 +1688,10 @@ public class GroupMetadataManager {
log.info("[GroupId {}] Member {} fenced from the group because " +
"it failed to transition from epoch {} within {}ms.",
groupId, memberId, memberEpoch, rebalanceTimeoutMs);
return new CoordinatorResult<>(consumerGroupFenceMember(group, member));
List<Record> records = new ArrayList<>();
CompletableFuture<Void> appendFuture = consumerGroupFenceMember(group, member, records);
return new CoordinatorResult<>(records, appendFuture);
} else {
log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " +
"left the epoch {}.", groupId, memberId, memberEpoch);

View File

@ -16,7 +16,10 @@
*/
package org.apache.kafka.coordinator.group.classic;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
@ -32,15 +35,22 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -1347,6 +1357,116 @@ public class ClassicGroup implements Group {
));
}
/**
* Convert the given ConsumerGroup to a corresponding ClassicGroup.
* The member with leavingMemberId will not be converted to the new ClassicGroup as it's the last
* member using new consumer protocol that left and triggered the downgrade.
*
* @param consumerGroup The converted ConsumerGroup.
* @param leavingMemberId The member that will not be converted in the ClassicGroup.
* @param logContext The logContext to create the ClassicGroup.
* @param time The time to create the ClassicGroup.
* @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
* @param metadataImage The MetadataImage.
* @return The created ClassicGroup.
*/
public static ClassicGroup fromConsumerGroup(
ConsumerGroup consumerGroup,
String leavingMemberId,
LogContext logContext,
Time time,
GroupCoordinatorMetricsShard metrics,
int consumerGroupSessionTimeoutMs,
MetadataImage metadataImage
) {
ClassicGroup classicGroup = new ClassicGroup(
logContext,
consumerGroup.groupId(),
ClassicGroupState.STABLE,
time,
metrics,
consumerGroup.groupEpoch(),
Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
Optional.empty(),
Optional.empty(),
Optional.of(time.milliseconds())
);
consumerGroup.members().forEach((memberId, member) -> {
if (!memberId.equals(leavingMemberId)) {
classicGroup.add(
new ClassicGroupMember(
memberId,
Optional.ofNullable(member.instanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeoutMs(),
consumerGroupSessionTimeoutMs,
ConsumerProtocol.PROTOCOL_TYPE,
member.supportedJoinGroupRequestProtocols(),
null
)
);
}
});
classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
classicGroup.allMembers().forEach(classicGroupMember -> {
// Set the assignment with serializing the ConsumerGroup's targetAssignment.
// The serializing version should align with that of the member's JoinGroupRequestProtocol.
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(ClassicGroup.topicPartitionListFromMap(
consumerGroup.targetAssignment().get(classicGroupMember.memberId()).partitions(),
metadataImage.topics()
)),
ConsumerProtocol.deserializeVersion(
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().orElse("")))
)
));
classicGroupMember.setAssignment(assignment);
});
return classicGroup;
}
/**
* Populate the record list with the records needed to create the given classic group.
*
* @param metadataVersion The MetadataVersion.
* @param records The list to which the new records are added.
*/
public void createClassicGroupRecords(
MetadataVersion metadataVersion,
List<Record> records
) {
Map<String, byte[]> assignments = new HashMap<>();
allMembers().forEach(classicGroupMember ->
assignments.put(classicGroupMember.memberId(), classicGroupMember.assignment())
);
records.add(RecordHelpers.newGroupMetadataRecord(this, assignments, metadataVersion));
}
/**
* @return The list of TopicPartition converted from the map of topic id and partition set.
*/
private static List<TopicPartition> topicPartitionListFromMap(
Map<Uuid, Set<Integer>> topicPartitions,
TopicsImage topicsImage
) {
List<TopicPartition> topicPartitionList = new ArrayList<>();
topicPartitions.forEach((topicId, partitionSet) -> {
TopicImage topicImage = topicsImage.getTopic(topicId);
if (topicImage != null) {
partitionSet.forEach(partition -> topicPartitionList.add(new TopicPartition(topicImage.name(), partition)));
}
});
return topicPartitionList;
}
/**
* Checks whether the transition to the target state is valid.
*

View File

@ -771,7 +771,19 @@ public class ConsumerGroup implements Group {
*/
@Override
public void createGroupTombstoneRecords(List<Record> records) {
members().forEach((memberId, member) ->
records.add(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId(), memberId))
);
members().forEach((memberId, member) ->
records.add(RecordHelpers.newTargetAssignmentTombstoneRecord(groupId(), memberId))
);
records.add(RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId()));
members().forEach((memberId, member) ->
records.add(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId(), memberId))
);
records.add(RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId()));
records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
}
@ -1212,9 +1224,13 @@ public class ConsumerGroup implements Group {
}
/**
* Checks whether all the members use the classic protocol except the given member.
*
* @param memberId The member to remove.
* @return A boolean indicating whether all the members use the classic protocol.
*/
public boolean allMembersUseClassicProtocol() {
return numClassicProtocolMembers() == members().size();
public boolean allMembersUseClassicProtocolExcept(String memberId) {
return numClassicProtocolMembers() == members().size() - 1 &&
!getOrMaybeCreateMember(memberId, false).useClassicProtocol();
}
}

View File

@ -479,6 +479,22 @@ public class ConsumerGroupMember {
return partitionsPendingRevocation;
}
/**
* @return The supported classic protocols converted to JoinGroupRequestProtocolCollection.
*/
public JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedJoinGroupRequestProtocols() {
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
supportedClassicProtocols().ifPresent(classicProtocols -> classicProtocols.forEach(protocol ->
protocols.add(
new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName(protocol.name())
.setMetadata(protocol.metadata())
)
));
return protocols;
}
/**
* @return The classicMemberMetadata if the consumer uses the classic protocol.
*/
@ -554,7 +570,7 @@ public class ConsumerGroupMember {
}
/**
* @return The boolean indicating whether the member uses the classic protocol.
* @return A boolean indicating whether the member uses the classic protocol.
*/
public boolean useClassicProtocol() {
return classicMemberMetadata != null;

View File

@ -16,13 +16,22 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.opentest4j.AssertionFailedError;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -198,6 +207,43 @@ public class Assertions {
}
}
}
} else if (actual.message() instanceof GroupMetadataValue) {
GroupMetadataValue expectedValue = (GroupMetadataValue) expected.message().duplicate();
GroupMetadataValue actualValue = (GroupMetadataValue) actual.message().duplicate();
expectedValue.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
actualValue.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
try {
Arrays.asList(expectedValue, actualValue).forEach(value ->
value.members().forEach(memberMetadata -> {
// Sort topics and ownedPartitions in Subscription.
ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
subscription.topics().sort(String::compareTo);
subscription.ownedPartitions().sort(
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
);
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
subscription,
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
)));
// Sort partitions in Assignment.
ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
assignment.partitions().sort(
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
);
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
assignment,
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
)));
})
);
} catch (SchemaException ex) {
fail("Failed deserialization: " + ex.getMessage());
}
assertEquals(expectedValue, actualValue);
} else {
assertEquals(expected.message(), actual.message());
}

View File

@ -68,6 +68,7 @@ import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.MemberState;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
@ -110,6 +111,7 @@ import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedListE
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupJoinKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
@ -10331,6 +10333,594 @@ public class GroupMetadataManagerTest {
.setErrorCode(NOT_COORDINATOR.code()), pendingMemberSyncResult.syncFuture.get());
}
@Test
public void testLastConsumerProtocolMemberLeavingConsumerGroup() {
String groupId = "group-id";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList(
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
.setName("range")
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
Arrays.asList(fooTopicName, barTopicName),
null,
Arrays.asList(
new TopicPartition(fooTopicName, 0),
new TopicPartition(fooTopicName, 1),
new TopicPartition(fooTopicName, 2),
new TopicPartition(barTopicName, 0),
new TopicPartition(barTopicName, 1)
)
))))
);
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setRebalanceTimeoutMs(45000)
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSupportedProtocols(protocols))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)))
.build();
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setRebalanceTimeoutMs(45000)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.build();
// Consumer group with two members.
// Member 1 uses the classic protocol and member 2 uses the consumer protocol.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1)
.withMember(member2)
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.withAssignmentEpoch(10))
.build();
context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}));
context.commit();
ConsumerGroup consumerGroup = context.groupMetadataManager.consumerGroup(groupId);
// Member 2 leaves the consumer group, triggering the downgrade.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList()));
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
new TopicPartition(fooTopicName, 0),
new TopicPartition(fooTopicName, 1),
new TopicPartition(fooTopicName, 2),
new TopicPartition(barTopicName, 0),
new TopicPartition(barTopicName, 1)
))));
Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
{
put(memberId1, assignment);
}
};
ClassicGroup expectedClassicGroup = new ClassicGroup(
new LogContext(),
groupId,
STABLE,
context.time,
context.metrics,
10,
Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
Optional.ofNullable("range"),
Optional.ofNullable(memberId1),
Optional.of(context.time.milliseconds())
);
expectedClassicGroup.add(
new ClassicGroupMember(
memberId1,
Optional.ofNullable(member1.instanceId()),
member1.clientId(),
member1.clientHost(),
member1.rebalanceTimeoutMs(),
45000,
ConsumerProtocol.PROTOCOL_TYPE,
member1.supportedJoinGroupRequestProtocols(),
assignment
)
);
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2),
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
RecordHelpers.newGroupEpochTombstoneRecord(groupId),
RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting())
);
assertUnorderedListEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2));
assertUnorderedListEquals(expectedRecords.subList(2, 4), result.records().subList(2, 4));
assertRecordEquals(expectedRecords.get(4), result.records().get(4));
assertUnorderedListEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7));
assertRecordsEquals(expectedRecords.subList(7, 9), result.records().subList(7, 9));
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, Record> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
ScheduledTimeout<Void, Record> groupJoinTimeout = context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
// A new rebalance is triggered.
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
// Simulate a failed write to the log.
result.appendFuture().completeExceptionally(new NotLeaderOrFollowerException());
context.rollback();
// The group is reverted back to the consumer group.
assertEquals(consumerGroup, context.groupMetadataManager.consumerGroup(groupId));
verify(context.metrics, times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null);
}
@Test
public void testLastConsumerProtocolMemberSessionTimeoutInConsumerGroup() {
String groupId = "group-id";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList(
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
.setName("range")
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
Arrays.asList(fooTopicName, barTopicName),
null,
Arrays.asList(
new TopicPartition(fooTopicName, 0),
new TopicPartition(fooTopicName, 1),
new TopicPartition(fooTopicName, 2),
new TopicPartition(barTopicName, 0),
new TopicPartition(barTopicName, 1)
)
))))
);
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setRebalanceTimeoutMs(45000)
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSupportedProtocols(protocols))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)))
.build();
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setRebalanceTimeoutMs(45000)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.build();
// Consumer group with two members.
// Member 1 uses the classic protocol and member 2 uses the consumer protocol.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1)
.withMember(member2)
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.withAssignmentEpoch(10))
.build();
context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}));
context.commit();
// Session timer is scheduled on the heartbeat.
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList()));
// Verify that there is a session timeout.
context.assertSessionTimeout(groupId, memberId2, 45000);
// Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the downgrade.
MockCoordinatorTimer.ExpiredTimeout<Void, Record> timeout = context.sleep(45000 + 1).get(0);
assertEquals(consumerGroupSessionTimeoutKey(groupId, memberId2), timeout.key);
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
new TopicPartition(fooTopicName, 0),
new TopicPartition(fooTopicName, 1),
new TopicPartition(fooTopicName, 2),
new TopicPartition(barTopicName, 0),
new TopicPartition(barTopicName, 1)
))));
Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
{
put(memberId1, assignment);
}
};
ClassicGroup expectedClassicGroup = new ClassicGroup(
new LogContext(),
groupId,
STABLE,
context.time,
context.metrics,
10,
Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
Optional.ofNullable("range"),
Optional.ofNullable(memberId1),
Optional.of(context.time.milliseconds())
);
expectedClassicGroup.add(
new ClassicGroupMember(
memberId1,
Optional.ofNullable(member1.instanceId()),
member1.clientId(),
member1.clientHost(),
member1.rebalanceTimeoutMs(),
45000,
ConsumerProtocol.PROTOCOL_TYPE,
member1.supportedJoinGroupRequestProtocols(),
assignment
)
);
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2),
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
RecordHelpers.newGroupEpochTombstoneRecord(groupId),
RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting())
);
assertUnorderedListEquals(expectedRecords.subList(0, 2), timeout.result.records().subList(0, 2));
assertUnorderedListEquals(expectedRecords.subList(2, 4), timeout.result.records().subList(2, 4));
assertRecordEquals(expectedRecords.get(4), timeout.result.records().get(4));
assertUnorderedListEquals(expectedRecords.subList(5, 7), timeout.result.records().subList(5, 7));
assertRecordsEquals(expectedRecords.subList(7, 9), timeout.result.records().subList(7, 9));
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, Record> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
ScheduledTimeout<Void, Record> groupJoinTimeout = context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
// A new rebalance is triggered.
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
@Test
public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
String groupId = "group-id";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
Uuid zarTopicId = Uuid.randomUuid();
String zarTopicName = "zar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList(
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
.setName("range")
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
Arrays.asList(fooTopicName, barTopicName),
null,
Arrays.asList(
new TopicPartition(fooTopicName, 0),
new TopicPartition(fooTopicName, 1),
new TopicPartition(fooTopicName, 2),
new TopicPartition(barTopicName, 0),
new TopicPartition(barTopicName, 1)
)
))))
);
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setRebalanceTimeoutMs(30000)
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSupportedProtocols(protocols))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)))
.build();
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
.setServerAssignorName("range")
.setRebalanceTimeoutMs(30000)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.build();
// Consumer group with two members.
// Member 1 uses the classic protocol and member 2 uses the consumer protocol.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addTopic(zarTopicId, zarTopicName, 1)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1)
.withMember(member2)
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)))
.withAssignmentEpoch(10))
.build();
context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1)));
}
}));
context.commit();
// Prepare the new assignment.
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)
)));
put(memberId2, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)
)));
}
}
));
// Member 2 heartbeats with a different subscribedTopicNames. The assignor computes a new assignment
// where member 2 will need to revoke topic partition bar-2 thus transitions to the REVOKING state.
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(3, 4, 5)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(2))
))
);
// Verify that there is a rebalance timeout.
context.assertRebalanceTimeout(groupId, memberId2, 30000);
// Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the downgrade.
MockCoordinatorTimer.ExpiredTimeout<Void, Record> timeout = context.sleep(30000 + 1).get(0);
assertEquals(consumerGroupRebalanceTimeoutKey(groupId, memberId2), timeout.key);
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
new TopicPartition(fooTopicName, 0),
new TopicPartition(fooTopicName, 1),
new TopicPartition(fooTopicName, 2),
new TopicPartition(barTopicName, 0),
new TopicPartition(barTopicName, 1)
))));
Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
{
put(memberId1, assignment);
}
};
ClassicGroup expectedClassicGroup = new ClassicGroup(
new LogContext(),
groupId,
STABLE,
context.time,
context.metrics,
11,
Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
Optional.ofNullable("range"),
Optional.ofNullable(memberId1),
Optional.of(context.time.milliseconds())
);
expectedClassicGroup.add(
new ClassicGroupMember(
memberId1,
Optional.ofNullable(member1.instanceId()),
member1.clientId(),
member1.clientHost(),
member1.rebalanceTimeoutMs(),
45000,
ConsumerProtocol.PROTOCOL_TYPE,
member1.supportedJoinGroupRequestProtocols(),
assignment
)
);
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2),
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
RecordHelpers.newGroupEpochTombstoneRecord(groupId),
RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting())
);
assertUnorderedListEquals(expectedRecords.subList(0, 2), timeout.result.records().subList(0, 2));
assertUnorderedListEquals(expectedRecords.subList(2, 4), timeout.result.records().subList(2, 4));
assertRecordEquals(expectedRecords.get(4), timeout.result.records().get(4));
assertUnorderedListEquals(expectedRecords.subList(5, 7), timeout.result.records().subList(5, 7));
assertRecordsEquals(expectedRecords.subList(7, 9), timeout.result.records().subList(7, 9));
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, null);
verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, Record> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
ScheduledTimeout<Void, Record> groupJoinTimeout = context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
// A new rebalance is triggered.
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,

View File

@ -528,7 +528,9 @@ public class GroupMetadataManagerTestContext {
request
);
result.records().forEach(this::replay);
if (result.appendFuture() == null) {
result.records().forEach(this::replay);
}
return result;
}

View File

@ -1111,7 +1111,7 @@ public class ConsumerGroupTest {
}
@Test
public void testAllMembersUseClassicProtocol() {
public void testNumClassicProtocolMembers() {
ConsumerGroup consumerGroup = createConsumerGroup("foo");
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new ArrayList<>();
protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
@ -1125,27 +1125,30 @@ public class ConsumerGroupTest {
.build();
consumerGroup.updateMember(member1);
assertEquals(1, consumerGroup.numClassicProtocolMembers());
assertTrue(consumerGroup.allMembersUseClassicProtocol());
// The group has member 1 (using the classic protocol) and member 2 (using the consumer protocol).
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member-2")
.build();
consumerGroup.updateMember(member2);
assertEquals(1, consumerGroup.numClassicProtocolMembers());
assertFalse(consumerGroup.allMembersUseClassicProtocol());
assertFalse(consumerGroup.allMembersUseClassicProtocolExcept("member-1"));
assertTrue(consumerGroup.allMembersUseClassicProtocolExcept("member-2"));
// The group has member 2 (using the consumer protocol).
// The group has member 2 (using the consumer protocol) and member 3 (using the consumer protocol).
consumerGroup.removeMember(member1.memberId());
ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member-3")
.build();
consumerGroup.updateMember(member3);
assertEquals(0, consumerGroup.numClassicProtocolMembers());
assertFalse(consumerGroup.allMembersUseClassicProtocol());
assertFalse(consumerGroup.allMembersUseClassicProtocolExcept("member-2"));
// The group has member 2 (using the classic protocol).
consumerGroup.removeMember(member2.memberId());
member2 = new ConsumerGroupMember.Builder("member-2")
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSupportedProtocols(protocols))
.build();
consumerGroup.updateMember(member2);
assertEquals(1, consumerGroup.numClassicProtocolMembers());
assertTrue(consumerGroup.allMembersUseClassicProtocol());
}
}