diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index b96a4db382f..35e225b8956 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -131,7 +131,8 @@ public enum ApiKeys { WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true), DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true), READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), - STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT); + STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT), + STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE); private static final Map> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 9c4b6863b85..c16c71903d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -352,6 +352,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion); case STREAMS_GROUP_HEARTBEAT: return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion); + case STREAMS_GROUP_DESCRIBE: + return StreamsGroupDescribeRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 5c3af7918f3..8f7b12d4fa4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -289,6 +289,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version); case STREAMS_GROUP_HEARTBEAT: return StreamsGroupHeartbeatResponse.parse(responseBuffer, version); + case STREAMS_GROUP_DESCRIBE: + return StreamsGroupDescribeResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java new file mode 100644 index 00000000000..1f0c46fafe1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupDescribeRequestData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + +public class StreamsGroupDescribeRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + + private final StreamsGroupDescribeRequestData data; + + public Builder(StreamsGroupDescribeRequestData data) { + this(data, false); + } + + public Builder(StreamsGroupDescribeRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsGroupDescribeRequest build(short version) { + return new StreamsGroupDescribeRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsGroupDescribeRequestData data; + + public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short version) { + super(ApiKeys.STREAMS_GROUP_DESCRIBE, version); + this.data = data; + } + + @Override + public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) { + StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() + .setThrottleTimeMs(throttleTimeMs); + // Set error for each group + this.data.groupIds().forEach( + groupId -> data.groups().add( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.forException(e).code()) + ) + ); + return new StreamsGroupDescribeResponse(data); + } + + @Override + public StreamsGroupDescribeRequestData data() { + return data; + } + + public static StreamsGroupDescribeRequest parse(ByteBuffer buffer, short version) { + return new StreamsGroupDescribeRequest( + new StreamsGroupDescribeRequestData(new ByteBufferAccessor(buffer), version), + version + ); + } + + public static List getErrorDescribedGroupList( + List groupIds, + Errors error + ) { + return groupIds.stream() + .map(groupId -> new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(error.code()) + ).collect(Collectors.toList()); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java new file mode 100644 index 00000000000..83db6700a4a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + */ +public class StreamsGroupDescribeResponse extends AbstractResponse { + + private final StreamsGroupDescribeResponseData data; + + public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) { + super(ApiKeys.STREAMS_GROUP_DESCRIBE); + this.data = data; + } + + @Override + public StreamsGroupDescribeResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + HashMap counts = new HashMap<>(); + data.groups().forEach( + group -> updateErrorCounts(counts, Errors.forCode(group.errorCode())) + ); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsGroupDescribeResponse parse(ByteBuffer buffer, short version) { + return new StreamsGroupDescribeResponse( + new StreamsGroupDescribeResponseData(new ByteBufferAccessor(buffer), version) + ); + } +} diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json new file mode 100644 index 00000000000..96fe0d9a1a6 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 89, + "type": "request", + "listeners": ["broker", "zkBroker"], + "name": "StreamsGroupDescribeRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", + "about": "The ids of the groups to describe" }, + { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", + "about": "Whether to include authorized operations." } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json new file mode 100644 index 00000000000..9cf2954c17f --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 89, + "type": "response", + "name": "StreamsGroupDescribeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", + "about": "Each described group.", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The describe error, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group ID string." }, + { "name": "GroupState", "type": "string", "versions": "0+", + "about": "The group state string, or the empty string." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", + "about": "The assignment epoch." }, + + { "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The topology metadata currently initialized for the streams application. Can be null in case of a describe error.", + "fields": [ + { "name": "Epoch", "type": "int32", "versions": "0+", + "about": "The epoch of the currently initialized topology for this group." }, + { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The subtopologies of the streams application. This contains the configured subtopologies, where the number of partitions are set and any regular expressions are resolved to actual topics. Null if the group is uninitialized, source topics are missing or incorrectly partitioned.", + "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "String to uniquely identify the subtopology." }, + { "name": "SourceTopics", "type": "[]string", "versions": "0+", + "about": "The topics the subtopology reads from." }, + { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", + "about": "The repartition topics the subtopology writes to." }, + { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", + "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, + { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", + "about": "The set of source topics that are internally created repartition topics. Created automatically." } + ]} + ]}, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", + "fields": [ + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID for static membership." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The rack ID." }, + + { "name": "ClientId", "type": "string", "versions": "0+", + "about": "The client ID." }, + { "name": "ClientHost", "type": "string", "versions": "0+", + "about": "The client host." }, + + { "name": "TopologyEpoch", "type": "int32", "versions": "0+", + "about": "The epoch of the topology on the client." }, + + { "name": "ProcessId", "type": "string", "versions": "0+", + "about": "Identity of the streams instance that may have multiple clients. " }, + { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "User-defined endpoint for Interactive Queries. Null if not defined for this client." }, + { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", + "about": "Used for rack-aware assignment algorithm." }, + { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", + "about": "Cumulative changelog offsets for tasks." }, + { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", + "about": "Cumulative changelog end offsets for tasks." }, + + { "name": "Assignment", "type": "Assignment", "versions": "0+", + "about": "The current assignment." }, + { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", + "about": "The target assignment." }, + { "name": "IsClassic", "type": "bool", "versions": "0+", + "about": "True for classic members that have not been upgraded yet." } + ]}, + { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", + "about": "32-bit bitfield to represent authorized operations for this group." } + ] + } + ], + "commonStructs": [ + { "name": "Endpoint", "versions": "0+", "fields": [ + { "name": "Host", "type": "string", "versions": "0+", + "about": "host of the endpoint" }, + { "name": "Port", "type": "uint16", "versions": "0+", + "about": "port of the endpoint" } + ]}, + { "name": "TaskOffset", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition." }, + { "name": "Offset", "type": "int64", "versions": "0+", + "about": "The offset." } + ]}, + { "name": "TopicPartitions", "versions": "0+", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic ID." }, + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions." } + ]}, + { "name": "Assignment", "versions": "0+", "fields": [ + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", + "about": "Active tasks for this client." }, + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", + "about": "Standby tasks for this client." }, + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", + "about": "Warm-up tasks for this client. " } + ]}, + { "name": "TaskIds", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions of the input topics processed by this member." } + ]}, + { "name": "KeyValue", "versions": "0+", "fields": [ + { "name": "Key", "type": "string", "versions": "0+", + "about": "key of the config" }, + { "name": "Value", "type": "string", "versions": "0+", + "about": "value of the config" } + ]}, + { "name": "TopicInfo", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The name of the topic." }, + { "name": "Partitions", "type": "int32", "versions": "0+", + "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, + { "name": "ReplicationFactor", "type": "int16", "versions": "0+", + "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, + { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", + "about": "Topic-level configurations as key-value pairs." + } + ]} + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index bc0a304a09f..09671850b8e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -234,6 +234,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState; import org.apache.kafka.common.message.StopReplicaResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeRequestData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -1127,6 +1129,7 @@ public class RequestResponseTest { case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version); case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version); + case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1222,6 +1225,7 @@ public class RequestResponseTest { case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse(); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(); + case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -4039,10 +4043,33 @@ public class RequestResponseTest { return new ReadShareGroupStateSummaryResponse(data); } + private AbstractRequest createStreamsGroupDescribeRequest(final short version) { + return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData() + .setGroupIds(Collections.singletonList("group")) + .setIncludeAuthorizedOperations(false)).build(version); + } + private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) { return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version); } + private AbstractResponse createStreamsGroupDescribeResponse() { + StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() + .setGroups(Collections.singletonList( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group") + .setErrorCode((short) 0) + .setErrorMessage(Errors.forCode((short) 0).message()) + .setGroupState("EMPTY") + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setMembers(new ArrayList<>(0)) + .setTopology(null) + )) + .setThrottleTimeMs(1000); + return new StreamsGroupDescribeResponse(data); + } + private AbstractResponse createStreamsGroupHeartbeatResponse() { return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData()); } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index be332e9085f..9bb76a66030 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -743,6 +743,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.STREAMS_GROUP_HEARTBEAT => new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData(), true) + case ApiKeys.STREAMS_GROUP_DESCRIBE => + new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData(), true) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 66fd9b52e20..f7b26601f7c 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -175,6 +175,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConvert import org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter; import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter; import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupDescribeRequestDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseDataJsonConverter; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter; import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter; @@ -358,6 +360,8 @@ import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse; import org.apache.kafka.common.requests.StopReplicaRequest; import org.apache.kafka.common.requests.StopReplicaResponse; +import org.apache.kafka.common.requests.StreamsGroupDescribeRequest; +import org.apache.kafka.common.requests.StreamsGroupDescribeResponse; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.requests.SyncGroupRequest; @@ -545,6 +549,8 @@ public class RequestConvertToJson { return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version()); case SHARE_GROUP_HEARTBEAT: return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version()); + case STREAMS_GROUP_DESCRIBE: + return StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest) request).data(), request.version()); case STREAMS_GROUP_HEARTBEAT: return StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version()); case STOP_REPLICA: @@ -731,6 +737,8 @@ public class RequestConvertToJson { return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version); case SHARE_GROUP_HEARTBEAT: return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version); + case STREAMS_GROUP_DESCRIBE: + return StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse) response).data(), version); case STREAMS_GROUP_HEARTBEAT: return StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version); case STOP_REPLICA: