mirror of https://github.com/apache/kafka.git
KAFKA-19546: Rebalance should be triggered by subscription change during group protocol downgrade (#20417)
During online downgrade, when a static member using the consumer protocol which is also the last member using the consumer protocol is replaced by another static member using the classic protocol with the same instance id, the latter will take the assignment of the former and an online downgrade will be triggered. In the current implementation, if the replacing static member has a different subscription, no rebalance will be triggered when the downgrade happens. The patch checks whether the static member has changed subscription and triggers a rebalance when it does. Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
parent
55020f909d
commit
cbea4f69bd
|
@ -1260,13 +1260,16 @@ public class GroupMetadataManager {
|
|||
* @param consumerGroup The converted ConsumerGroup.
|
||||
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
|
||||
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
|
||||
* When not null, must have an instanceId that matches an existing member.
|
||||
* When not null, must have an instanceId that matches the replaced member.
|
||||
* @param hasSubscriptionChanged The boolean indicating whether the joining member has a different subscription
|
||||
* from the replaced member. Only used when joiningMember is set.
|
||||
* @param records The record list to which the conversion records are added.
|
||||
*/
|
||||
private void convertToClassicGroup(
|
||||
ConsumerGroup consumerGroup,
|
||||
Set<ConsumerGroupMember> leavingMembers,
|
||||
ConsumerGroupMember joiningMember,
|
||||
boolean hasSubscriptionChanged,
|
||||
List<CoordinatorRecord> records
|
||||
) {
|
||||
if (joiningMember == null) {
|
||||
|
@ -1307,9 +1310,12 @@ public class GroupMetadataManager {
|
|||
|
||||
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
|
||||
|
||||
// If the downgrade is triggered by a member leaving the group, a rebalance should be triggered.
|
||||
// If the downgrade is triggered by a member leaving the group or a static
|
||||
// member replacement with a different subscription, a rebalance should be triggered.
|
||||
if (joiningMember == null) {
|
||||
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
|
||||
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for member leaving.", classicGroup.groupId()));
|
||||
} else if (hasSubscriptionChanged) {
|
||||
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for static member replacement with different subscription.", classicGroup.groupId()));
|
||||
}
|
||||
|
||||
log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId());
|
||||
|
@ -2401,6 +2407,10 @@ public class GroupMetadataManager {
|
|||
);
|
||||
}
|
||||
|
||||
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
|
||||
boolean downgrade = existingStaticMemberOrNull != null &&
|
||||
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
|
||||
|
||||
int groupEpoch = group.groupEpoch();
|
||||
SubscriptionType subscriptionType = group.subscriptionType();
|
||||
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
|
||||
|
@ -2447,8 +2457,35 @@ public class GroupMetadataManager {
|
|||
subscriptionType = result.subscriptionType;
|
||||
}
|
||||
|
||||
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
|
||||
// the existing and the new target assignment is persisted to the partition.
|
||||
if (downgrade) {
|
||||
// 2. If the static member subscription hasn't changed, reconcile the member's assignment with the existing
|
||||
// assignment if the member is not fully reconciled yet. If the static member subscription has changed, a
|
||||
// rebalance will be triggered during downgrade anyway so we can skip the reconciliation.
|
||||
if (!bumpGroupEpoch) {
|
||||
updatedMember = maybeReconcile(
|
||||
groupId,
|
||||
updatedMember,
|
||||
group::currentPartitionEpoch,
|
||||
group.assignmentEpoch(),
|
||||
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
|
||||
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
|
||||
records
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Downgrade the consumer group.
|
||||
convertToClassicGroup(
|
||||
group,
|
||||
Set.of(),
|
||||
updatedMember,
|
||||
bumpGroupEpoch,
|
||||
records
|
||||
);
|
||||
} else {
|
||||
// If no downgrade is triggered.
|
||||
|
||||
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch.
|
||||
// The delta between the existing and the new target assignment is persisted to the partition.
|
||||
final int targetAssignmentEpoch;
|
||||
final Assignment targetAssignment;
|
||||
|
||||
|
@ -2465,11 +2502,9 @@ public class GroupMetadataManager {
|
|||
} else {
|
||||
targetAssignmentEpoch = group.assignmentEpoch();
|
||||
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
|
||||
|
||||
}
|
||||
|
||||
// 3. Reconcile the member's assignment with the target assignment if the member is not
|
||||
// fully reconciled yet.
|
||||
// 3. Reconcile the member's assignment with the target assignment if the member is not fully reconciled yet.
|
||||
updatedMember = maybeReconcile(
|
||||
groupId,
|
||||
updatedMember,
|
||||
|
@ -2479,19 +2514,6 @@ public class GroupMetadataManager {
|
|||
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
|
||||
records
|
||||
);
|
||||
|
||||
// 4. Maybe downgrade the consumer group if the last static member using the
|
||||
// consumer protocol is replaced by the joining static member.
|
||||
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
|
||||
boolean downgrade = existingStaticMemberOrNull != null &&
|
||||
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
|
||||
if (downgrade) {
|
||||
convertToClassicGroup(
|
||||
group,
|
||||
Set.of(),
|
||||
updatedMember,
|
||||
records
|
||||
);
|
||||
}
|
||||
|
||||
final JoinGroupResponseData response = new JoinGroupResponseData()
|
||||
|
@ -4084,7 +4106,7 @@ public class GroupMetadataManager {
|
|||
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
|
||||
convertToClassicGroup(group, members, null, records);
|
||||
convertToClassicGroup(group, members, null, false, records);
|
||||
return new CoordinatorResult<>(records, response, null, false);
|
||||
} else {
|
||||
for (ConsumerGroupMember member : members) {
|
||||
|
|
|
@ -12457,8 +12457,11 @@ public class GroupMetadataManagerTest {
|
|||
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws ExecutionException, InterruptedException {
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember(
|
||||
boolean isSubscriptionChanged
|
||||
) throws ExecutionException, InterruptedException {
|
||||
String groupId = "group-id";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String oldMemberId2 = Uuid.randomUuid().toString();
|
||||
|
@ -12469,11 +12472,9 @@ public class GroupMetadataManagerTest {
|
|||
Uuid barTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
|
||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||
|
||||
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = List.of(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName("range")
|
||||
.setName(NoOpPartitionAssignor.NAME)
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
List.of(fooTopicName, barTopicName),
|
||||
null,
|
||||
|
@ -12493,8 +12494,8 @@ public class GroupMetadataManagerTest {
|
|||
.setPreviousMemberEpoch(9)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setSubscribedTopicNames(List.of("foo", "bar"))
|
||||
.setServerAssignorName("range")
|
||||
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
|
||||
.setServerAssignorName(NoOpPartitionAssignor.NAME)
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
|
@ -12512,8 +12513,8 @@ public class GroupMetadataManagerTest {
|
|||
.setPreviousMemberEpoch(9)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setSubscribedTopicNames(List.of("foo"))
|
||||
.setServerAssignorName("range")
|
||||
.setSubscribedTopicNames(List.of(fooTopicName))
|
||||
.setServerAssignorName(NoOpPartitionAssignor.NAME)
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 3, 4, 5)))
|
||||
|
@ -12524,12 +12525,14 @@ public class GroupMetadataManagerTest {
|
|||
.addTopic(barTopicId, barTopicName, 2)
|
||||
.addRacks()
|
||||
.buildCoordinatorMetadataImage();
|
||||
long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
|
||||
long barTopicHash = computeTopicHash(barTopicName, metadataImage);
|
||||
|
||||
// Consumer group with two members.
|
||||
// Member 1 uses the classic protocol and static member 2 uses the consumer protocol.
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
|
||||
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
|
||||
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new NoOpPartitionAssignor()))
|
||||
.withMetadataImage(metadataImage)
|
||||
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
||||
.withMember(member1)
|
||||
|
@ -12549,12 +12552,19 @@ public class GroupMetadataManagerTest {
|
|||
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
|
||||
|
||||
// A new member using classic protocol with the same instance id joins, scheduling the downgrade.
|
||||
byte[] protocolsMetadata2 = Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
isSubscriptionChanged ? List.of(fooTopicName, barTopicName) : List.of(fooTopicName))));
|
||||
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
|
||||
protocols2.add(new JoinGroupRequestProtocol()
|
||||
.setName(NoOpPartitionAssignor.NAME)
|
||||
.setMetadata(protocolsMetadata2));
|
||||
JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
|
||||
.withGroupId(groupId)
|
||||
.withMemberId(UNKNOWN_MEMBER_ID)
|
||||
.withGroupInstanceId(instanceId)
|
||||
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
|
||||
.withDefaultProtocolTypeAndProtocols()
|
||||
.withProtocols(protocols2)
|
||||
.build();
|
||||
GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest);
|
||||
result.appendFuture.complete(null);
|
||||
|
@ -12566,14 +12576,15 @@ public class GroupMetadataManagerTest {
|
|||
.build();
|
||||
ConsumerGroupMember expectedNewClassicMember2 = new ConsumerGroupMember.Builder(oldMember2, newMemberId2)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setMemberEpoch(isSubscriptionChanged ? 11 : 10)
|
||||
.setSubscribedTopicNames(isSubscriptionChanged ? List.of(fooTopicName, barTopicName) : List.of(fooTopicName))
|
||||
.setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
|
||||
.setSupportedProtocols(List.of(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName("range")
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
List.of(fooTopicName)))))))
|
||||
.setName(NoOpPartitionAssignor.NAME)
|
||||
.setMetadata(protocolsMetadata2)))
|
||||
).build();
|
||||
|
||||
byte[] assignment1 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of(
|
||||
|
@ -12600,7 +12611,7 @@ public class GroupMetadataManagerTest {
|
|||
context.time,
|
||||
10,
|
||||
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
|
||||
Optional.of("range"),
|
||||
Optional.of(NoOpPartitionAssignor.NAME),
|
||||
Optional.of(memberId1),
|
||||
Optional.of(context.time.milliseconds())
|
||||
);
|
||||
|
@ -12636,8 +12647,7 @@ public class GroupMetadataManagerTest {
|
|||
assertTrue(Set.of(memberId1, newMemberId2).contains(leader));
|
||||
expectedClassicGroup.setLeaderId(Optional.of(leader));
|
||||
|
||||
assertUnorderedRecordsEquals(
|
||||
List.of(
|
||||
List<List<CoordinatorRecord>> replacingRecords = List.of(
|
||||
// Remove the existing member 2 that uses the consumer protocol.
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, oldMemberId2)),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, oldMemberId2)),
|
||||
|
@ -12646,12 +12656,26 @@ public class GroupMetadataManagerTest {
|
|||
// Create the new member 2 that uses the consumer protocol.
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewConsumerMember2)),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewConsumerMember2)),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewConsumerMember2))
|
||||
);
|
||||
|
||||
// Update the new member 2 to the member that uses classic protocol.
|
||||
List<List<CoordinatorRecord>> memberUpdateRecords;
|
||||
if (isSubscriptionChanged) {
|
||||
memberUpdateRecords = List.of(
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewClassicMember2)),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewClassicMember2)),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
|
||||
fooTopicName, fooTopicHash,
|
||||
barTopicName, barTopicHash
|
||||
))))
|
||||
);
|
||||
} else {
|
||||
memberUpdateRecords = List.of(
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewClassicMember2)),
|
||||
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewClassicMember2))
|
||||
);
|
||||
}
|
||||
|
||||
List<List<CoordinatorRecord>> downgradeRecords = List.of(
|
||||
// Remove member 1, member 2 and the consumer group.
|
||||
List.of(
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
||||
|
@ -12671,7 +12695,12 @@ public class GroupMetadataManagerTest {
|
|||
|
||||
// Create the classic group.
|
||||
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments))
|
||||
),
|
||||
);
|
||||
|
||||
assertUnorderedRecordsEquals(
|
||||
Stream.of(replacingRecords, memberUpdateRecords, downgradeRecords)
|
||||
.flatMap(List::stream)
|
||||
.collect(Collectors.toList()),
|
||||
result.records
|
||||
);
|
||||
|
||||
|
@ -12681,10 +12710,14 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
assertNotNull(heartbeatTimeout);
|
||||
|
||||
// No rebalance is triggered.
|
||||
// If the subscription is changed, a rebalance is triggered.
|
||||
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
|
||||
if (isSubscriptionChanged) {
|
||||
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
|
||||
} else {
|
||||
assertTrue(classicGroup.isInState(STABLE));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
|
||||
|
|
Loading…
Reference in New Issue