KAFKA-19546: Rebalance should be triggered by subscription change during group protocol downgrade (#20581)

Cherry-pick KAFKA-19546 to 4.1.

During online downgrade, when a static member using the consumer
protocol which is also the last member using the consumer protocol is
replaced by another static member using the classic protocol with the
same instance id, the latter will take the assignment of the former and
an online downgrade will be triggered.

In the current implementation, if the replacing static member has a
different subscription, no rebalance will be triggered when the
downgrade happens. The patch checks whether the static member has
changed subscription and triggers a rebalance when it does.

Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Dongnuo Lyu 2025-09-26 05:00:23 -04:00 committed by GitHub
parent 02d58b176c
commit e76213e182
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 149 additions and 94 deletions

View File

@ -1255,13 +1255,16 @@ public class GroupMetadataManager {
* @param consumerGroup The converted ConsumerGroup. * @param consumerGroup The converted ConsumerGroup.
* @param leavingMembers The leaving member(s) that triggered the downgrade validation. * @param leavingMembers The leaving member(s) that triggered the downgrade validation.
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement. * @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
* When not null, must have an instanceId that matches an existing member. * When not null, must have an instanceId that matches the replaced member.
* @param hasSubscriptionChanged The boolean indicating whether the joining member has a different subscription
* from the replaced member. Only used when joiningMember is set.
* @param records The record list to which the conversion records are added. * @param records The record list to which the conversion records are added.
*/ */
private void convertToClassicGroup( private void convertToClassicGroup(
ConsumerGroup consumerGroup, ConsumerGroup consumerGroup,
Set<ConsumerGroupMember> leavingMembers, Set<ConsumerGroupMember> leavingMembers,
ConsumerGroupMember joiningMember, ConsumerGroupMember joiningMember,
boolean hasSubscriptionChanged,
List<CoordinatorRecord> records List<CoordinatorRecord> records
) { ) {
if (joiningMember == null) { if (joiningMember == null) {
@ -1302,9 +1305,12 @@ public class GroupMetadataManager {
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
// If the downgrade is triggered by a member leaving the group, a rebalance should be triggered. // If the downgrade is triggered by a member leaving the group or a static
// member replacement with a different subscription, a rebalance should be triggered.
if (joiningMember == null) { if (joiningMember == null) {
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId())); prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for member leaving.", classicGroup.groupId()));
} else if (hasSubscriptionChanged) {
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for static member replacement with different subscription.", classicGroup.groupId()));
} }
log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId()); log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId());
@ -2397,6 +2403,10 @@ public class GroupMetadataManager {
); );
} }
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
boolean downgrade = existingStaticMemberOrNull != null &&
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
int groupEpoch = group.groupEpoch(); int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType(); SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols); final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
@ -2443,8 +2453,35 @@ public class GroupMetadataManager {
subscriptionType = result.subscriptionType; subscriptionType = result.subscriptionType;
} }
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between if (downgrade) {
// the existing and the new target assignment is persisted to the partition. // 2. If the static member subscription hasn't changed, reconcile the member's assignment with the existing
// assignment if the member is not fully reconciled yet. If the static member subscription has changed, a
// rebalance will be triggered during downgrade anyway so we can skip the reconciliation.
if (!bumpGroupEpoch) {
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
group.assignmentEpoch(),
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
records
);
}
// 3. Downgrade the consumer group.
convertToClassicGroup(
group,
Set.of(),
updatedMember,
bumpGroupEpoch,
records
);
} else {
// If no downgrade is triggered.
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch.
// The delta between the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch; final int targetAssignmentEpoch;
final Assignment targetAssignment; final Assignment targetAssignment;
@ -2461,11 +2498,9 @@ public class GroupMetadataManager {
} else { } else {
targetAssignmentEpoch = group.assignmentEpoch(); targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()); targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
} }
// 3. Reconcile the member's assignment with the target assignment if the member is not // 3. Reconcile the member's assignment with the target assignment if the member is not fully reconciled yet.
// fully reconciled yet.
updatedMember = maybeReconcile( updatedMember = maybeReconcile(
groupId, groupId,
updatedMember, updatedMember,
@ -2475,19 +2510,6 @@ public class GroupMetadataManager {
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()), toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
records records
); );
// 4. Maybe downgrade the consumer group if the last static member using the
// consumer protocol is replaced by the joining static member.
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
boolean downgrade = existingStaticMemberOrNull != null &&
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
if (downgrade) {
convertToClassicGroup(
group,
Set.of(),
updatedMember,
records
);
} }
final JoinGroupResponseData response = new JoinGroupResponseData() final JoinGroupResponseData response = new JoinGroupResponseData()
@ -4058,7 +4080,7 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records = new ArrayList<>(); List<CoordinatorRecord> records = new ArrayList<>();
if (validateOnlineDowngradeWithFencedMembers(group, members)) { if (validateOnlineDowngradeWithFencedMembers(group, members)) {
convertToClassicGroup(group, members, null, records); convertToClassicGroup(group, members, null, false, records);
return new CoordinatorResult<>(records, response, null, false); return new CoordinatorResult<>(records, response, null, false);
} else { } else {
for (ConsumerGroupMember member : members) { for (ConsumerGroupMember member : members) {

View File

@ -12451,8 +12451,11 @@ public class GroupMetadataManagerTest {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
} }
@Test @ParameterizedTest
public void testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws ExecutionException, InterruptedException { @ValueSource(booleans = {true, false})
public void testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember(
boolean isSubscriptionChanged
) throws ExecutionException, InterruptedException {
String groupId = "group-id"; String groupId = "group-id";
String memberId1 = Uuid.randomUuid().toString(); String memberId1 = Uuid.randomUuid().toString();
String oldMemberId2 = Uuid.randomUuid().toString(); String oldMemberId2 = Uuid.randomUuid().toString();
@ -12463,11 +12466,9 @@ public class GroupMetadataManagerTest {
Uuid barTopicId = Uuid.randomUuid(); Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar"; String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = List.of( List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = List.of(
new ConsumerGroupMemberMetadataValue.ClassicProtocol() new ConsumerGroupMemberMetadataValue.ClassicProtocol()
.setName("range") .setName(NoOpPartitionAssignor.NAME)
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
List.of(fooTopicName, barTopicName), List.of(fooTopicName, barTopicName),
null, null,
@ -12487,8 +12488,8 @@ public class GroupMetadataManagerTest {
.setPreviousMemberEpoch(9) .setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID) .setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of("foo", "bar")) .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
.setServerAssignorName("range") .setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(45000) .setRebalanceTimeoutMs(45000)
.setClassicMemberMetadata( .setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
@ -12506,8 +12507,8 @@ public class GroupMetadataManagerTest {
.setPreviousMemberEpoch(9) .setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID) .setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of("foo")) .setSubscribedTopicNames(List.of(fooTopicName))
.setServerAssignorName("range") .setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(45000) .setRebalanceTimeoutMs(45000)
.setAssignedPartitions(mkAssignment( .setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5))) mkTopicAssignment(fooTopicId, 3, 4, 5)))
@ -12518,12 +12519,14 @@ public class GroupMetadataManagerTest {
.addTopic(barTopicId, barTopicName, 2) .addTopic(barTopicId, barTopicName, 2)
.addRacks() .addRacks()
.build(); .build();
long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
long barTopicHash = computeTopicHash(barTopicName, metadataImage);
// Consumer group with two members. // Consumer group with two members.
// Member 1 uses the classic protocol and static member 2 uses the consumer protocol. // Member 1 uses the classic protocol and static member 2 uses the consumer protocol.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new NoOpPartitionAssignor()))
.withMetadataImage(metadataImage) .withMetadataImage(metadataImage)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1) .withMember(member1)
@ -12543,12 +12546,19 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
// A new member using classic protocol with the same instance id joins, scheduling the downgrade. // A new member using classic protocol with the same instance id joins, scheduling the downgrade.
byte[] protocolsMetadata2 = Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
isSubscriptionChanged ? List.of(fooTopicName, barTopicName) : List.of(fooTopicName))));
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
protocols2.add(new JoinGroupRequestProtocol()
.setName(NoOpPartitionAssignor.NAME)
.setMetadata(protocolsMetadata2));
JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(groupId) .withGroupId(groupId)
.withMemberId(UNKNOWN_MEMBER_ID) .withMemberId(UNKNOWN_MEMBER_ID)
.withGroupInstanceId(instanceId) .withGroupInstanceId(instanceId)
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.withDefaultProtocolTypeAndProtocols() .withProtocols(protocols2)
.build(); .build();
GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest); GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest);
result.appendFuture.complete(null); result.appendFuture.complete(null);
@ -12560,14 +12570,15 @@ public class GroupMetadataManagerTest {
.build(); .build();
ConsumerGroupMember expectedNewClassicMember2 = new ConsumerGroupMember.Builder(oldMember2, newMemberId2) ConsumerGroupMember expectedNewClassicMember2 = new ConsumerGroupMember.Builder(oldMember2, newMemberId2)
.setPreviousMemberEpoch(0) .setPreviousMemberEpoch(0)
.setMemberEpoch(isSubscriptionChanged ? 11 : 10)
.setSubscribedTopicNames(isSubscriptionChanged ? List.of(fooTopicName, barTopicName) : List.of(fooTopicName))
.setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs()) .setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
.setClassicMemberMetadata( .setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(joinRequest.sessionTimeoutMs()) .setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
.setSupportedProtocols(List.of(new ConsumerGroupMemberMetadataValue.ClassicProtocol() .setSupportedProtocols(List.of(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
.setName("range") .setName(NoOpPartitionAssignor.NAME)
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( .setMetadata(protocolsMetadata2)))
List.of(fooTopicName)))))))
).build(); ).build();
byte[] assignment1 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of( byte[] assignment1 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of(
@ -12594,7 +12605,7 @@ public class GroupMetadataManagerTest {
context.time, context.time,
10, 10,
Optional.of(ConsumerProtocol.PROTOCOL_TYPE), Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
Optional.of("range"), Optional.of(NoOpPartitionAssignor.NAME),
Optional.of(memberId1), Optional.of(memberId1),
Optional.of(context.time.milliseconds()) Optional.of(context.time.milliseconds())
); );
@ -12630,8 +12641,7 @@ public class GroupMetadataManagerTest {
assertTrue(Set.of(memberId1, newMemberId2).contains(leader)); assertTrue(Set.of(memberId1, newMemberId2).contains(leader));
expectedClassicGroup.setLeaderId(Optional.of(leader)); expectedClassicGroup.setLeaderId(Optional.of(leader));
assertUnorderedRecordsEquals( List<List<CoordinatorRecord>> replacingRecords = List.of(
List.of(
// Remove the existing member 2 that uses the consumer protocol. // Remove the existing member 2 that uses the consumer protocol.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, oldMemberId2)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, oldMemberId2)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, oldMemberId2)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, oldMemberId2)),
@ -12640,12 +12650,26 @@ public class GroupMetadataManagerTest {
// Create the new member 2 that uses the consumer protocol. // Create the new member 2 that uses the consumer protocol.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewConsumerMember2)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewConsumerMember2)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId2, expectedNewConsumerMember2.assignedPartitions())), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewConsumerMember2)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewConsumerMember2))
);
// Update the new member 2 to the member that uses classic protocol. List<List<CoordinatorRecord>> memberUpdateRecords;
if (isSubscriptionChanged) {
memberUpdateRecords = List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewClassicMember2)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewClassicMember2)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewClassicMember2)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, fooTopicHash,
barTopicName, barTopicHash
))))
);
} else {
memberUpdateRecords = List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewClassicMember2)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewClassicMember2))
);
}
List<List<CoordinatorRecord>> downgradeRecords = List.of(
// Remove member 1, member 2 and the consumer group. // Remove member 1, member 2 and the consumer group.
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
@ -12665,7 +12689,12 @@ public class GroupMetadataManagerTest {
// Create the classic group. // Create the classic group.
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments)) List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments))
), );
assertUnorderedRecordsEquals(
Stream.of(replacingRecords, memberUpdateRecords, downgradeRecords)
.flatMap(List::stream)
.collect(Collectors.toList()),
result.records result.records
); );
@ -12675,10 +12704,14 @@ public class GroupMetadataManagerTest {
); );
assertNotNull(heartbeatTimeout); assertNotNull(heartbeatTimeout);
// No rebalance is triggered. // If the subscription is changed, a rebalance is triggered.
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
if (isSubscriptionChanged) {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
} else {
assertTrue(classicGroup.isInState(STABLE)); assertTrue(classicGroup.isInState(STABLE));
} }
}
@Test @Test
public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {