diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 97c382ca87f..48dfae27892 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -111,7 +111,8 @@ public enum ApiKeys { DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true), - CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT); + CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT), + CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 64f1c2e4a2f..b81ccea014e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -312,6 +312,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return AllocateProducerIdsRequest.parse(buffer, apiVersion); case CONSUMER_GROUP_HEARTBEAT: return ConsumerGroupHeartbeatRequest.parse(buffer, apiVersion); + case CONSUMER_GROUP_DESCRIBE: + return ConsumerGroupDescribeRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 4b55a6d582d..bb033806329 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -249,6 +249,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return AllocateProducerIdsResponse.parse(responseBuffer, version); case CONSUMER_GROUP_HEARTBEAT: return ConsumerGroupHeartbeatResponse.parse(responseBuffer, version); + case CONSUMER_GROUP_DESCRIBE: + return ConsumerGroupDescribeResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java new file mode 100644 index 00000000000..862d9c9d4e4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ConsumerGroupDescribeRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + + private final ConsumerGroupDescribeRequestData data; + + public Builder(ConsumerGroupDescribeRequestData data) { + this(data, false); + } + + public Builder(ConsumerGroupDescribeRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.CONSUMER_GROUP_DESCRIBE, enableUnstableLastVersion); + this.data = data; + } + + @Override + public ConsumerGroupDescribeRequest build(short version) { + return new ConsumerGroupDescribeRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final ConsumerGroupDescribeRequestData data; + + public ConsumerGroupDescribeRequest(ConsumerGroupDescribeRequestData data, short version) { + super(ApiKeys.CONSUMER_GROUP_DESCRIBE, version); + this.data = data; + } + + @Override + public ConsumerGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData() + .setThrottleTimeMs(throttleTimeMs); + // Set error for each group + this.data.groupIds().forEach( + groupId -> data.groups().add( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.forException(e).code()) + ) + ); + return new ConsumerGroupDescribeResponse(data); + } + + @Override + public ConsumerGroupDescribeRequestData data() { + return data; + } + + public static ConsumerGroupDescribeRequest parse(ByteBuffer buffer, short version) { + return new ConsumerGroupDescribeRequest( + new ConsumerGroupDescribeRequestData(new ByteBufferAccessor(buffer), version), + version + ); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java new file mode 100644 index 00000000000..70456e7b024 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + */ +public class ConsumerGroupDescribeResponse extends AbstractResponse { + + private final ConsumerGroupDescribeResponseData data; + + public ConsumerGroupDescribeResponse(ConsumerGroupDescribeResponseData data) { + super(ApiKeys.CONSUMER_GROUP_DESCRIBE); + this.data = data; + } + + @Override + public ConsumerGroupDescribeResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + HashMap counts = new HashMap<>(); + data.groups().forEach( + group -> updateErrorCounts(counts, Errors.forCode(group.errorCode())) + ); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static ConsumerGroupDescribeResponse parse(ByteBuffer buffer, short version) { + return new ConsumerGroupDescribeResponse( + new ConsumerGroupDescribeResponseData(new ByteBufferAccessor(buffer), version) + ); + } +} diff --git a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json new file mode 100644 index 00000000000..1d7842a1206 --- /dev/null +++ b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ConsumerGroupDescribeRequest", + "validVersions": "0", + // The ConsumerGroupDescribe API is added as part of KIP-848 and is still + // under development. Hence, the API is not exposed by default by brokers + // unless explicitly enabled. + "latestVersionUnstable": true, + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", + "about": "The ids of the groups to describe" }, + { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", + "about": "Whether to include authorized operations." } + ] +} diff --git a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json new file mode 100644 index 00000000000..1dea4e2ead6 --- /dev/null +++ b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "response", + "name": "ConsumerGroupDescribeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", + "about": "Each described group.", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The describe error, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group ID string." }, + { "name": "GroupState", "type": "string", "versions": "0+", + "about": "The group state string, or the empty string." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", + "about": "The assignment epoch." }, + { "name": "AssignorName", "type": "string", "versions": "0+", + "about": "The selected assignor." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", + "fields": [ + { "name": "MemberId", "type": "uuid", "versions": "0+", + "about": "The member ID." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member rack ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch." }, + { "name": "ClientId", "type": "string", "versions": "0+", + "about": "The client ID." }, + { "name": "ClientHost", "type": "string", "versions": "0+", + "about": "The client host." }, + { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", + "about": "The subscribed topic names." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "the subscribed topic regex otherwise or null of not provided." }, + { "name": "Assignment", "type": "Assignment", "versions": "0+", + "about": "The current assignment." }, + { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", + "about": "The target assignment." } + ]}, + { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", + "about": "32-bit bitfield to represent authorized operations for this group." } + ] + } + ], + "commonStructs": [ + { "name": "TopicPartitions", "versions": "0+", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic ID." }, + { "name": "TopicName", "type": "string", "versions": "0+", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions." } + ]}, + { "name": "Assignment", "versions": "0+", "fields": [ + { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", + "about": "The assigned topic-partitions to the member." }, + { "name": "Error", "type": "int8", "versions": "0+", + "about": "The assigned error." }, + { "name": "MetadataVersion", "type": "int32", "versions": "0+", + "about": "The assignor metadata version." }, + { "name": "MetadataBytes", "type": "bytes", "versions": "0+", + "about": "The assignor metadata bytes." } + ]} + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequestTest.java new file mode 100644 index 00000000000..81255da2b11 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequestTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupDescribeRequestTest { + + @Test + void testGetErrorResponse() { + List groupIds = Arrays.asList("group0", "group1"); + ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData(); + data.groupIds().addAll(groupIds); + ConsumerGroupDescribeRequest request = new ConsumerGroupDescribeRequest.Builder(data, true) + .build(); + Throwable e = Errors.GROUP_AUTHORIZATION_FAILED.exception(); + int throttleTimeMs = 1000; + + ConsumerGroupDescribeResponse response = request.getErrorResponse(throttleTimeMs, e); + + assertEquals(throttleTimeMs, response.throttleTimeMs()); + for (int i = 0; i < groupIds.size(); i++) { + ConsumerGroupDescribeResponseData.DescribedGroup group = response.data().groups().get(i); + assertEquals(groupIds.get(i), group.groupId()); + assertEquals(Errors.forException(e).code(), group.errorCode()); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponseTest.java new file mode 100644 index 00000000000..ed3d87bd963 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponseTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ConsumerGroupDescribeResponseTest { + + @Test + void testErrorCounts() { + Errors e = Errors.INVALID_GROUP_ID; + int errorCount = 2; + ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData(); + for (int i = 0; i < errorCount; i++) { + data.groups().add( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setErrorCode(e.code()) + ); + } + ConsumerGroupDescribeResponse response = new ConsumerGroupDescribeResponse(data); + + Map counts = response.errorCounts(); + + assertEquals(errorCount, counts.get(e)); + assertNull(counts.get(Errors.COORDINATOR_NOT_AVAILABLE)); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c8bd3563b53..d196aa45a3a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -67,6 +67,8 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData; import org.apache.kafka.common.message.BrokerHeartbeatResponseData; import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.message.BrokerRegistrationResponseData; +import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ControlledShutdownRequestData; @@ -1057,6 +1059,7 @@ public class RequestResponseTest { case LIST_TRANSACTIONS: return createListTransactionsRequest(version); case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsRequest(version); case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatRequest(version); + case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1132,10 +1135,35 @@ public class RequestResponseTest { case LIST_TRANSACTIONS: return createListTransactionsResponse(); case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsResponse(); case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse(); + case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } + private ConsumerGroupDescribeRequest createConsumerGroupDescribeRequest(short version) { + ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData() + .setGroupIds(Collections.singletonList("group")) + .setIncludeAuthorizedOperations(false); + return new ConsumerGroupDescribeRequest.Builder(data).build(version); + } + + private ConsumerGroupDescribeResponse createConsumerGroupDescribeResponse() { + ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData() + .setGroups(Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId("group") + .setErrorCode((short) 0) + .setErrorMessage(Errors.forCode((short) 0).message()) + .setGroupState(ConsumerGroupState.EMPTY.toString()) + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setAssignorName("range") + .setMembers(new ArrayList(0)) + )) + .setThrottleTimeMs(1000); + return new ConsumerGroupDescribeResponse(data); + } + private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) { ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData() .setGroupId("group") diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index 771a86e8f0f..895f9d9b944 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -96,6 +96,7 @@ object RequestConvertToJson { case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version) case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version) case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version) + case req: ConsumerGroupDescribeRequest => ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so."); } @@ -172,6 +173,7 @@ object RequestConvertToJson { case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version) case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version) case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version) + case res: ConsumerGroupDescribeResponse => ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so."); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ab4bbe96237..fb4b138192e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -242,6 +242,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) + case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3721,6 +3722,11 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { + requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordConversionStats): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 50ba898b25c..7b398622cc3 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -48,6 +48,7 @@ import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigC import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig} import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse} import org.apache.kafka.common.message.ApiMessageType.ListenerType +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource} @@ -6210,4 +6211,21 @@ class KafkaApisTest { val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + + @Test + def testConsumerGroupDescribeReturnsUnsupportedVersion(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + val errorCode = Errors.UNSUPPORTED_VERSION.code + val expectedDescribedGroup = new DescribedGroup().setGroupId(groupId).setErrorCode(errorCode) + val expectedResponse = new ConsumerGroupDescribeResponseData() + expectedResponse.groups.add(expectedDescribedGroup) + + createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching) + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + assertEquals(expectedResponse, response.data) + } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 6e09a3a90c1..1943f22563b 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -710,6 +710,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CONSUMER_GROUP_HEARTBEAT => new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData(), true) + case ApiKeys.CONSUMER_GROUP_DESCRIBE => + new ConsumerGroupDescribeRequest.Builder(new ConsumerGroupDescribeRequestData(), true) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) }