KAFKA-14509; [1/2] Define ConsumerGroupDescribe API request and response schemas and classes. (#14124)

This patch adds the schemas of the new ConsumerGroupDescribe API (KIP-848) and adds an handler to the KafkaApis.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Max Riedel 2023-09-07 17:05:04 +02:00 committed by GitHub
parent 6d762480c9
commit 90e646052a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 456 additions and 1 deletions

View File

@ -111,7 +111,8 @@ public enum ApiKeys {
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true),
CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT);
CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT),
CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -312,6 +312,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return AllocateProducerIdsRequest.parse(buffer, apiVersion);
case CONSUMER_GROUP_HEARTBEAT:
return ConsumerGroupHeartbeatRequest.parse(buffer, apiVersion);
case CONSUMER_GROUP_DESCRIBE:
return ConsumerGroupDescribeRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -249,6 +249,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return AllocateProducerIdsResponse.parse(responseBuffer, version);
case CONSUMER_GROUP_HEARTBEAT:
return ConsumerGroupHeartbeatResponse.parse(responseBuffer, version);
case CONSUMER_GROUP_DESCRIBE:
return ConsumerGroupDescribeResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class ConsumerGroupDescribeRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ConsumerGroupDescribeRequest> {
private final ConsumerGroupDescribeRequestData data;
public Builder(ConsumerGroupDescribeRequestData data) {
this(data, false);
}
public Builder(ConsumerGroupDescribeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.CONSUMER_GROUP_DESCRIBE, enableUnstableLastVersion);
this.data = data;
}
@Override
public ConsumerGroupDescribeRequest build(short version) {
return new ConsumerGroupDescribeRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ConsumerGroupDescribeRequestData data;
public ConsumerGroupDescribeRequest(ConsumerGroupDescribeRequestData data, short version) {
super(ApiKeys.CONSUMER_GROUP_DESCRIBE, version);
this.data = data;
}
@Override
public ConsumerGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData()
.setThrottleTimeMs(throttleTimeMs);
// Set error for each group
this.data.groupIds().forEach(
groupId -> data.groups().add(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.forException(e).code())
)
);
return new ConsumerGroupDescribeResponse(data);
}
@Override
public ConsumerGroupDescribeRequestData data() {
return data;
}
public static ConsumerGroupDescribeRequest parse(ByteBuffer buffer, short version) {
return new ConsumerGroupDescribeRequest(
new ConsumerGroupDescribeRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
*/
public class ConsumerGroupDescribeResponse extends AbstractResponse {
private final ConsumerGroupDescribeResponseData data;
public ConsumerGroupDescribeResponse(ConsumerGroupDescribeResponseData data) {
super(ApiKeys.CONSUMER_GROUP_DESCRIBE);
this.data = data;
}
@Override
public ConsumerGroupDescribeResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
data.groups().forEach(
group -> updateErrorCounts(counts, Errors.forCode(group.errorCode()))
);
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ConsumerGroupDescribeResponse parse(ByteBuffer buffer, short version) {
return new ConsumerGroupDescribeResponse(
new ConsumerGroupDescribeResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 69,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupDescribeRequest",
"validVersions": "0",
// The ConsumerGroupDescribe API is added as part of KIP-848 and is still
// under development. Hence, the API is not exposed by default by brokers
// unless explicitly enabled.
"latestVersionUnstable": true,
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
]
}

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 69,
"type": "response",
"name": "ConsumerGroupDescribeResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group ID string." },
{ "name": "GroupState", "type": "string", "versions": "0+",
"about": "The group state string, or the empty string." },
{ "name": "GroupEpoch", "type": "int32", "versions": "0+",
"about": "The group epoch." },
{ "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
"about": "The assignment epoch." },
{ "name": "AssignorName", "type": "string", "versions": "0+",
"about": "The selected assignor." },
{ "name": "Members", "type": "[]Member", "versions": "0+",
"about": "The members.",
"fields": [
{ "name": "MemberId", "type": "uuid", "versions": "0+",
"about": "The member ID." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member instance ID." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member rack ID." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch." },
{ "name": "ClientId", "type": "string", "versions": "0+",
"about": "The client ID." },
{ "name": "ClientHost", "type": "string", "versions": "0+",
"about": "The client host." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+",
"about": "The subscribed topic names." },
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "the subscribed topic regex otherwise or null of not provided." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." },
{ "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
"about": "The target assignment." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "TopicName", "type": "string", "versions": "0+",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]},
{ "name": "Assignment", "versions": "0+", "fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The assigned topic-partitions to the member." },
{ "name": "Error", "type": "int8", "versions": "0+",
"about": "The assigned error." },
{ "name": "MetadataVersion", "type": "int32", "versions": "0+",
"about": "The assignor metadata version." },
{ "name": "MetadataBytes", "type": "bytes", "versions": "0+",
"about": "The assignor metadata bytes." }
]}
]
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ConsumerGroupDescribeRequestTest {
@Test
void testGetErrorResponse() {
List<String> groupIds = Arrays.asList("group0", "group1");
ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData();
data.groupIds().addAll(groupIds);
ConsumerGroupDescribeRequest request = new ConsumerGroupDescribeRequest.Builder(data, true)
.build();
Throwable e = Errors.GROUP_AUTHORIZATION_FAILED.exception();
int throttleTimeMs = 1000;
ConsumerGroupDescribeResponse response = request.getErrorResponse(throttleTimeMs, e);
assertEquals(throttleTimeMs, response.throttleTimeMs());
for (int i = 0; i < groupIds.size(); i++) {
ConsumerGroupDescribeResponseData.DescribedGroup group = response.data().groups().get(i);
assertEquals(groupIds.get(i), group.groupId());
assertEquals(Errors.forException(e).code(), group.errorCode());
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ConsumerGroupDescribeResponseTest {
@Test
void testErrorCounts() {
Errors e = Errors.INVALID_GROUP_ID;
int errorCount = 2;
ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData();
for (int i = 0; i < errorCount; i++) {
data.groups().add(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setErrorCode(e.code())
);
}
ConsumerGroupDescribeResponse response = new ConsumerGroupDescribeResponse(data);
Map<Errors, Integer> counts = response.errorCounts();
assertEquals(errorCount, counts.get(e));
assertNull(counts.get(Errors.COORDINATOR_NOT_AVAILABLE));
}
}

View File

@ -67,6 +67,8 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerHeartbeatResponseData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.BrokerRegistrationResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
@ -1057,6 +1059,7 @@ public class RequestResponseTest {
case LIST_TRANSACTIONS: return createListTransactionsRequest(version);
case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsRequest(version);
case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatRequest(version);
case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1132,10 +1135,35 @@ public class RequestResponseTest {
case LIST_TRANSACTIONS: return createListTransactionsResponse();
case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsResponse();
case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse();
case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
private ConsumerGroupDescribeRequest createConsumerGroupDescribeRequest(short version) {
ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData()
.setGroupIds(Collections.singletonList("group"))
.setIncludeAuthorizedOperations(false);
return new ConsumerGroupDescribeRequest.Builder(data).build(version);
}
private ConsumerGroupDescribeResponse createConsumerGroupDescribeResponse() {
ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData()
.setGroups(Collections.singletonList(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("group")
.setErrorCode((short) 0)
.setErrorMessage(Errors.forCode((short) 0).message())
.setGroupState(ConsumerGroupState.EMPTY.toString())
.setGroupEpoch(0)
.setAssignmentEpoch(0)
.setAssignorName("range")
.setMembers(new ArrayList<ConsumerGroupDescribeResponseData.Member>(0))
))
.setThrottleTimeMs(1000);
return new ConsumerGroupDescribeResponse(data);
}
private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) {
ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
.setGroupId("group")

View File

@ -96,6 +96,7 @@ object RequestConvertToJson {
case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version)
case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
case req: ConsumerGroupDescribeRequest => ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
"code should be updated to do so.");
}
@ -172,6 +173,7 @@ object RequestConvertToJson {
case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version)
case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
case res: ConsumerGroupDescribeResponse => ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so.");
}

View File

@ -242,6 +242,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@ -3721,6 +3722,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = {

View File

@ -48,6 +48,7 @@ import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigC
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
@ -6210,4 +6211,21 @@ class KafkaApisTest {
val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testConsumerGroupDescribeReturnsUnsupportedVersion(): Unit = {
val groupId = "group0"
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
consumerGroupDescribeRequestData.groupIds.add(groupId)
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedDescribedGroup = new DescribedGroup().setGroupId(groupId).setErrorCode(errorCode)
val expectedResponse = new ConsumerGroupDescribeResponseData()
expectedResponse.groups.add(expectedDescribedGroup)
createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
assertEquals(expectedResponse, response.data)
}
}

View File

@ -710,6 +710,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CONSUMER_GROUP_HEARTBEAT =>
new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData(), true)
case ApiKeys.CONSUMER_GROUP_DESCRIBE =>
new ConsumerGroupDescribeRequest.Builder(new ConsumerGroupDescribeRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}