MINOR: Fix response for consumer group describe with empty group id (#20030)

ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Rajini Sivaram 2025-06-25 10:33:44 +01:00 committed by rajinisivaram
parent fb054b590e
commit 85f9e93933
10 changed files with 143 additions and 15 deletions

View File

@ -207,6 +207,20 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
)
assertEquals(expected, actual)
val unknownGroupResponse = consumerGroupDescribe(
groupIds = List("grp-unknown"),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode())
val emptyGroupResponse = consumerGroupDescribe(
groupIds = List(""),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode())
}
} finally {
admin.close()

View File

@ -301,6 +301,48 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC
}
}
@ClusterTest
def testEmptyConsumerGroupId(): Unit = {
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(Errors.INVALID_REQUEST.code, consumerGroupHeartbeatResponse.data.errorCode)
assertEquals("GroupId can't be empty.", consumerGroupHeartbeatResponse.data.errorMessage)
} finally {
admin.close()
}
}
@ClusterTest
def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
val admin = cluster.admin()

View File

@ -78,8 +78,8 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
)
deleteGroups(
groupIds = List("grp-non-empty", "grp"),
expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE),
groupIds = List("grp-non-empty", "grp", ""),
expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE, Errors.GROUP_ID_NOT_FOUND),
version = version.toShort
)

View File

@ -93,10 +93,15 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat
.setGroupId("grp-unknown")
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
.setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null)
.setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null),
new DescribedGroup()
.setGroupId("")
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
.setErrorMessage(if (version >= 6) "Group not found." else null)
),
describeGroups(
groupIds = List("grp-1", "grp-2", "grp-unknown"),
groupIds = List("grp-1", "grp-2", "grp-unknown", ""),
version = version.toShort
)
)

View File

@ -179,6 +179,15 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
expectedError = Errors.UNKNOWN_MEMBER_ID,
version = version.toShort
)
// Heartbeat with empty group id.
heartbeat(
groupId = "",
memberId = leaderMemberId,
generationId = -1,
expectedError = Errors.INVALID_GROUP_ID,
version = version.toShort
)
}
}
}

View File

@ -139,6 +139,17 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
)
)
// Join with an empty group id.
verifyJoinGroupResponseDataEquals(
new JoinGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code)
.setProtocolName(if (version >= 7) null else ""),
sendJoinRequest(
groupId = "",
version = version.toShort
)
)
// Join with an inconsistent protocolType.
verifyJoinGroupResponseDataEquals(
new JoinGroupResponseData()

View File

@ -233,6 +233,42 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
)
)
// Fetch with empty group id.
assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("")
.setTopics(List(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName(if (version < 10) "foo" else "")
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setPartitions(List(
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(-1L),
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(1)
.setCommittedOffset(-1L),
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(5)
.setCommittedOffset(-1L)
).asJava)
).asJava),
fetchOffsets(
group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("")
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setTopics(List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(topicId)
.setPartitionIndexes(List[Integer](0, 1, 5).asJava) // 5 does not exist.
).asJava),
requireStable = requireStable,
version = version.toShort
)
)
// Fetch with stale member epoch.
assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()

View File

@ -64,6 +64,17 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
version = version.toShort
)
// Sync with empty group id.
verifySyncGroupWithOldProtocol(
groupId = "",
memberId = "member-id",
generationId = -1,
expectedProtocolType = null,
expectedProtocolName = null,
expectedError = Errors.INVALID_GROUP_ID,
version = version.toShort
)
val metadata = ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
).array

View File

@ -1076,7 +1076,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
} else {
futures.add(CompletableFuture.completedFuture(List.of(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code())
)));
}
@ -1128,7 +1128,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
} else {
futures.add(CompletableFuture.completedFuture(List.of(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code())
)));
}
@ -1180,7 +1180,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
} else {
futures.add(CompletableFuture.completedFuture(List.of(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code())
)));
}
@ -1262,7 +1262,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
if (groupId == null) {
futures.add(CompletableFuture.completedFuture(List.of(
new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code())
)));
} else {

View File

@ -1558,7 +1558,7 @@ public class GroupCoordinatorServiceTest {
.setGroupId("");
List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup
);
@ -1953,11 +1953,11 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> partitionCount);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code());
List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup
);
@ -2091,11 +2091,11 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> partitionCount);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code());
List<StreamsGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup
);
@ -3435,11 +3435,11 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> partitionCount);
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code());
List<ShareGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup
);