MINOR: Fix response for consumer group describe with empty group id (#20030) (#20036)
CI / build (push) Waiting to run Details

ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Rajini Sivaram 2025-06-25 17:56:58 +01:00 committed by GitHub
parent 15ec053665
commit 6351bc05aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 137 additions and 12 deletions

View File

@ -211,6 +211,20 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
) )
assertEquals(expected, actual) assertEquals(expected, actual)
val unknownGroupResponse = consumerGroupDescribe(
groupIds = List("grp-unknown"),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode())
val emptyGroupResponse = consumerGroupDescribe(
groupIds = List(""),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode())
} }
} finally { } finally {
admin.close() admin.close()

View File

@ -301,6 +301,48 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
} }
} }
@ClusterTest
def testEmptyConsumerGroupId(): Unit = {
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(Errors.INVALID_REQUEST.code, consumerGroupHeartbeatResponse.data.errorCode)
assertEquals("GroupId can't be empty.", consumerGroupHeartbeatResponse.data.errorMessage)
} finally {
admin.close()
}
}
@ClusterTest @ClusterTest
def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = { def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
val admin = cluster.admin() val admin = cluster.admin()

View File

@ -99,8 +99,8 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
) )
deleteGroups( deleteGroups(
groupIds = List("grp-non-empty", "grp"), groupIds = List("grp-non-empty", "grp", ""),
expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE), expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE, Errors.GROUP_ID_NOT_FOUND),
version = version.toShort version = version.toShort
) )

View File

@ -104,10 +104,15 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat
.setGroupId("grp-unknown") .setGroupId("grp-unknown")
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist. .setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code()) .setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
.setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null) .setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null),
new DescribedGroup()
.setGroupId("")
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
.setErrorMessage(if (version >= 6) "Group not found." else null)
), ),
describeGroups( describeGroups(
groupIds = List("grp-1", "grp-2", "grp-unknown"), groupIds = List("grp-1", "grp-2", "grp-unknown", ""),
version = version.toShort version = version.toShort
) )
) )

View File

@ -190,6 +190,15 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
expectedError = Errors.UNKNOWN_MEMBER_ID, expectedError = Errors.UNKNOWN_MEMBER_ID,
version = version.toShort version = version.toShort
) )
// Heartbeat with empty group id.
heartbeat(
groupId = "",
memberId = leaderMemberId,
generationId = -1,
expectedError = Errors.INVALID_GROUP_ID,
version = version.toShort
)
} }
} }
} }

View File

@ -149,6 +149,17 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
) )
) )
// Join with an empty group id.
verifyJoinGroupResponseDataEquals(
new JoinGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code)
.setProtocolName(if (version >= 7) null else ""),
sendJoinRequest(
groupId = "",
version = version.toShort
)
)
// Join with an inconsistent protocolType. // Join with an inconsistent protocolType.
verifyJoinGroupResponseDataEquals( verifyJoinGroupResponseDataEquals(
new JoinGroupResponseData() new JoinGroupResponseData()

View File

@ -269,6 +269,39 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
) )
) )
// Fetch with empty group id.
assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("")
.setTopics(List(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(List(
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(-1L),
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(1)
.setCommittedOffset(-1L),
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(5)
.setCommittedOffset(-1L)
).asJava)
).asJava),
fetchOffsets(
groupId = "",
memberId = memberId,
memberEpoch = memberEpoch,
partitions = List(
new TopicPartition("foo", 0),
new TopicPartition("foo", 1),
new TopicPartition("foo", 5) // This one does not exist.
),
requireStable = requireStable,
version = version.toShort
)
)
// Fetch with stale member epoch. // Fetch with stale member epoch.
assertEquals( assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup() new OffsetFetchResponseData.OffsetFetchResponseGroup()

View File

@ -76,6 +76,17 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
version = version.toShort version = version.toShort
) )
// Sync with empty group id.
verifySyncGroupWithOldProtocol(
groupId = "",
memberId = "member-id",
generationId = -1,
expectedProtocolType = null,
expectedProtocolName = null,
expectedError = Errors.INVALID_GROUP_ID,
version = version.toShort
)
val metadata = ConsumerProtocol.serializeSubscription( val metadata = ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo")) new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
).array ).array

View File

@ -636,7 +636,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
} else { } else {
futures.add(CompletableFuture.completedFuture(Collections.singletonList( futures.add(CompletableFuture.completedFuture(Collections.singletonList(
new ConsumerGroupDescribeResponseData.DescribedGroup() new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()) .setErrorCode(Errors.INVALID_GROUP_ID.code())
))); )));
} }
@ -687,7 +687,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
} else { } else {
futures.add(CompletableFuture.completedFuture(Collections.singletonList( futures.add(CompletableFuture.completedFuture(Collections.singletonList(
new ShareGroupDescribeResponseData.DescribedGroup() new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()) .setErrorCode(Errors.INVALID_GROUP_ID.code())
))); )));
} }
@ -736,7 +736,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
if (groupId == null) { if (groupId == null) {
futures.add(CompletableFuture.completedFuture(Collections.singletonList( futures.add(CompletableFuture.completedFuture(Collections.singletonList(
new DescribeGroupsResponseData.DescribedGroup() new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()) .setErrorCode(Errors.INVALID_GROUP_ID.code())
))); )));
} else { } else {

View File

@ -1038,7 +1038,7 @@ public class GroupCoordinatorServiceTest {
.setGroupId(""); .setGroupId("");
List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new DescribeGroupsResponseData.DescribedGroup() new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()), .setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup describedGroup
); );
@ -1470,11 +1470,11 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> partitionCount); service.startup(() -> partitionCount);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()); .setErrorCode(Errors.INVALID_GROUP_ID.code());
List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new ConsumerGroupDescribeResponseData.DescribedGroup() new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()), .setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup describedGroup
); );
@ -2347,11 +2347,11 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> partitionCount); service.startup(() -> partitionCount);
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()); .setErrorCode(Errors.INVALID_GROUP_ID.code());
List<ShareGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( List<ShareGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new ShareGroupDescribeResponseData.DescribedGroup() new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(null) .setGroupId("")
.setErrorCode(Errors.INVALID_GROUP_ID.code()), .setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup describedGroup
); );