KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)

This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-03-19 20:48:41 +00:00 committed by GitHub
parent bf3f088c94
commit c66d66dc67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 105 additions and 20 deletions

View File

@ -1227,10 +1227,13 @@ public class GroupMetadataManager {
.setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
// The assignment is only provided in the following cases:
// 1. The member reported its owned partitions;
// 2. The member just joined or rejoined to group (epoch equals to zero);
// 3. The member's assignment has been updated.
if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) {
// 1. The member sent a full request. It does so when joining or rejoining the group with zero
// as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields
// (rebalanceTimeoutMs, subscribedTopicNames and ownedTopicPartitions) to detect a full request
// as those must be set in a full request.
// 2. The member's assignment has been updated.
boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 && subscribedTopicNames != null && ownedTopicPartitions != null);
if (isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createResponseAssignment(updatedMember));
}

View File

@ -1650,6 +1650,102 @@ public class GroupMetadataManagerTest {
.setTopicPartitions(Collections.emptyList())));
}
@Test
public void testConsumerGroupHeartbeatFullResponse() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
// Create a context with an empty consumer group.
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.addRacks()
.build())
.build();
// Prepare new assignment for the group.
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
}
}
));
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result;
// A full response should be sent back on joining.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1))))),
result.response()
);
// Otherwise, a partial response should be sent back.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().memberEpoch()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000),
result.response()
);
// A full response should be sent back when the member sends
// a full request again.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().memberEpoch())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1))))),
result.response()
);
}
@Test
public void testReconciliationProcess() {
String groupId = "fooup";
@ -1904,16 +2000,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Collections.singletonList(0))
))),
.setHeartbeatIntervalMs(5000),
result.response()
);
@ -3057,12 +3144,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1))))),
.setHeartbeatIntervalMs(5000),
result.response()
);