KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790)
CI / build (push) Waiting to run Details

When a consumer protocol static member replaces an existing member in a
classic group, it's not necessary to recompute the assignment. However,
it happens anyway.

In

[ConsumerGroup.fromClassicGroup](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java (L1140)),
we don't set the group's subscriptionMetadata.  Later in the consumer
group heartbeat, we [call

updateSubscriptionMetadata](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L1748)),
which [notices that the group's subscriptionMetadata needs an

update](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L2757))
and bumps the epoch. Since the epoch is bumped, we [recompute the

assignment](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L1766)).

As a fix, this patch sets the subscriptionMetadata in
ConsumerGroup.fromClassicGroup.

Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Dongnuo Lyu 2025-05-27 05:25:57 -04:00 committed by David Jacot
parent 3170e1130c
commit e9c5069be7
4 changed files with 49 additions and 43 deletions

View File

@ -1115,7 +1115,8 @@ public class GroupMetadataManager {
snapshotRegistry,
metrics,
classicGroup,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);
} catch (SchemaException e) {
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +

View File

@ -43,6 +43,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@ -1129,7 +1130,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
* @param snapshotRegistry The SnapshotRegistry.
* @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
* @param topicsImage The TopicsImage for topic id and topic name conversion.
* @param topicsImage The current metadata for all available topics.
* @param clusterImage The current metadata for the Kafka cluster.
* @return The created ConsumerGroup.
*
* @throws SchemaException if any member's subscription or assignment cannot be deserialized.
@ -1139,7 +1141,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
TopicsImage topicsImage
TopicsImage topicsImage,
ClusterImage clusterImage
) {
String groupId = classicGroup.groupId();
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
@ -1195,6 +1198,12 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
consumerGroup.updateMember(newMember);
});
consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata(
consumerGroup.subscribedTopicNames(),
topicsImage,
clusterImage
));
return consumerGroup;
}
@ -1210,6 +1219,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
);
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata()));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch()));
members().forEach((consumerGroupMemberId, consumerGroupMember) ->

View File

@ -9815,6 +9815,10 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
@ -9823,12 +9827,6 @@ public class GroupMetadataManagerTest {
// Member 2 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)),
@ -10030,6 +10028,11 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@ -10042,12 +10045,6 @@ public class GroupMetadataManagerTest {
// Member 3 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
@ -10235,7 +10232,7 @@ public class GroupMetadataManagerTest {
);
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
group.initNextGeneration();
group.transitionTo(STABLE);
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments));
@ -10257,8 +10254,8 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember expectedClassicMember = new ConsumerGroupMember.Builder(memberId)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.setMemberEpoch(group.generationId())
.setPreviousMemberEpoch(group.generationId())
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of(fooTopicName))
@ -10294,7 +10291,7 @@ public class GroupMetadataManagerTest {
.build();
ConsumerGroupMember expectedFinalConsumerMember = new ConsumerGroupMember.Builder(expectedReplacingConsumerMember)
.setMemberEpoch(1)
.setMemberEpoch(group.generationId())
.setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(5000)
.setClassicMemberMetadata(null)
@ -10306,9 +10303,10 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with the static member.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, group.generationId()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember),
// Remove the static member because the rejoining member replaces it.
@ -10321,17 +10319,10 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedReplacingConsumerMember),
// The static member rejoins the new consumer group.
// The static member rejoins the new consumer group with the same instance id and
// takes the assignment of the previous member. No new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedFinalConsumerMember),
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
// Newly joining static member bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
// The newly created static member takes the assignment from the existing member.
// Bump its member epoch and transition to STABLE.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedFinalConsumerMember)
@ -10432,6 +10423,10 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10440,12 +10435,6 @@ public class GroupMetadataManagerTest {
// Member 2 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Collections.emptyMap()),
@ -10817,6 +10806,11 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@ -10829,12 +10823,6 @@ public class GroupMetadataManagerTest {
// Member 3 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),

View File

@ -1533,7 +1533,8 @@ public class ConsumerGroupTest {
new SnapshotRegistry(logContext),
mock(GroupCoordinatorMetricsShard.class),
classicGroup,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
@ -1546,6 +1547,10 @@ public class ConsumerGroupTest {
expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
expectedConsumerGroup.setSubscriptionMetadata(Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
));
expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId)
.setMemberEpoch(classicGroup.generationId())
.setState(MemberState.STABLE)
@ -1577,6 +1582,7 @@ public class ConsumerGroupTest {
assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch());
assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor());
assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata()));
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
}