KAFKA-18283: Add StreamsGroupDescribe RPC definitions (#18230)

Adds a new RPC StreamsGroupDescribe that returns, given the group ID, all metadata related to the streams group, such as

 - The topology metadata of the group.
 - The topology epoch of the group.
 - The latest member metadata that each member provided through the StreamsGroupHeartbeat API.
 - The current target assignment generated by the assignor.
 - This just adds the JSON as defined in KIP-1071, together with some plumbing.

Reviewers: Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Lucas Brutschy 2024-12-18 19:38:01 +01:00 committed by GitHub
parent 08efe735a4
commit 0055ef0a49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 416 additions and 1 deletions

View File

@ -131,7 +131,8 @@ public enum ApiKeys {
WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT);
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =

View File

@ -352,6 +352,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_DESCRIBE:
return StreamsGroupDescribeRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -289,6 +289,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponse.parse(responseBuffer, version);
case STREAMS_GROUP_DESCRIBE:
return StreamsGroupDescribeResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
public class StreamsGroupDescribeRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<StreamsGroupDescribeRequest> {
private final StreamsGroupDescribeRequestData data;
public Builder(StreamsGroupDescribeRequestData data) {
this(data, false);
}
public Builder(StreamsGroupDescribeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion);
this.data = data;
}
@Override
public StreamsGroupDescribeRequest build(short version) {
return new StreamsGroupDescribeRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final StreamsGroupDescribeRequestData data;
public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_DESCRIBE, version);
this.data = data;
}
@Override
public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData()
.setThrottleTimeMs(throttleTimeMs);
// Set error for each group
this.data.groupIds().forEach(
groupId -> data.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.forException(e).code())
)
);
return new StreamsGroupDescribeResponse(data);
}
@Override
public StreamsGroupDescribeRequestData data() {
return data;
}
public static StreamsGroupDescribeRequest parse(ByteBuffer buffer, short version) {
return new StreamsGroupDescribeRequest(
new StreamsGroupDescribeRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
public static List<StreamsGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList(
List<String> groupIds,
Errors error
) {
return groupIds.stream()
.map(groupId -> new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(error.code())
).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
*/
public class StreamsGroupDescribeResponse extends AbstractResponse {
private final StreamsGroupDescribeResponseData data;
public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) {
super(ApiKeys.STREAMS_GROUP_DESCRIBE);
this.data = data;
}
@Override
public StreamsGroupDescribeResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
data.groups().forEach(
group -> updateErrorCounts(counts, Errors.forCode(group.errorCode()))
);
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static StreamsGroupDescribeResponse parse(ByteBuffer buffer, short version) {
return new StreamsGroupDescribeResponse(
new StreamsGroupDescribeResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 89,
"type": "request",
"listeners": ["broker", "zkBroker"],
"name": "StreamsGroupDescribeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
]
}

View File

@ -0,0 +1,167 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 89,
"type": "response",
"name": "StreamsGroupDescribeResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group ID string." },
{ "name": "GroupState", "type": "string", "versions": "0+",
"about": "The group state string, or the empty string." },
{ "name": "GroupEpoch", "type": "int32", "versions": "0+",
"about": "The group epoch." },
{ "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
"about": "The assignment epoch." },
{ "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The topology metadata currently initialized for the streams application. Can be null in case of a describe error.",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The epoch of the currently initialized topology for this group." },
{ "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The subtopologies of the streams application. This contains the configured subtopologies, where the number of partitions are set and any regular expressions are resolved to actual topics. Null if the group is uninitialized, source topics are missing or incorrectly partitioned.",
"fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "String to uniquely identify the subtopology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the subtopology reads from." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
"about": "The repartition topics the subtopology writes to." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
"about": "The set of state changelog topics associated with this subtopology. Created automatically." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
"about": "The set of source topics that are internally created repartition topics. Created automatically." }
]}
]},
{ "name": "Members", "type": "[]Member", "versions": "0+",
"about": "The members.",
"fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member instance ID for static membership." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The rack ID." },
{ "name": "ClientId", "type": "string", "versions": "0+",
"about": "The client ID." },
{ "name": "ClientHost", "type": "string", "versions": "0+",
"about": "The client host." },
{ "name": "TopologyEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the topology on the client." },
{ "name": "ProcessId", "type": "string", "versions": "0+",
"about": "Identity of the streams instance that may have multiple clients. " },
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "User-defined endpoint for Interactive Queries. Null if not defined for this client." },
{ "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
"about": "Used for rack-aware assignment algorithm." },
{ "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+",
"about": "Cumulative changelog offsets for tasks." },
{ "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+",
"about": "Cumulative changelog end offsets for tasks." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." },
{ "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
"about": "The target assignment." },
{ "name": "IsClassic", "type": "bool", "versions": "0+",
"about": "True for classic members that have not been upgraded yet." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
"commonStructs": [
{ "name": "Endpoint", "versions": "0+", "fields": [
{ "name": "Host", "type": "string", "versions": "0+",
"about": "host of the endpoint" },
{ "name": "Port", "type": "uint16", "versions": "0+",
"about": "port of the endpoint" }
]},
{ "name": "TaskOffset", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition." },
{ "name": "Offset", "type": "int64", "versions": "0+",
"about": "The offset." }
]},
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]},
{ "name": "Assignment", "versions": "0+", "fields": [
{ "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
"about": "Active tasks for this client." },
{ "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
"about": "Standby tasks for this client." },
{ "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
"about": "Warm-up tasks for this client. " }
]},
{ "name": "TaskIds", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this member." }
]},
{ "name": "KeyValue", "versions": "0+", "fields": [
{ "name": "Key", "type": "string", "versions": "0+",
"about": "key of the config" },
{ "name": "Value", "type": "string", "versions": "0+",
"about": "value of the config" }
]},
{ "name": "TopicInfo", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of the topic." },
{ "name": "Partitions", "type": "int32", "versions": "0+",
"about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
{ "name": "ReplicationFactor", "type": "int16", "versions": "0+",
"about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." },
{ "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
]}
]
}

View File

@ -234,6 +234,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState;
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState;
import org.apache.kafka.common.message.StopReplicaResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@ -1127,6 +1129,7 @@ public class RequestResponseTest {
case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version);
case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version);
case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version);
case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1222,6 +1225,7 @@ public class RequestResponseTest {
case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse();
case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse();
case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse();
case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -4039,10 +4043,33 @@ public class RequestResponseTest {
return new ReadShareGroupStateSummaryResponse(data);
}
private AbstractRequest createStreamsGroupDescribeRequest(final short version) {
return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData()
.setGroupIds(Collections.singletonList("group"))
.setIncludeAuthorizedOperations(false)).build(version);
}
private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) {
return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version);
}
private AbstractResponse createStreamsGroupDescribeResponse() {
StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData()
.setGroups(Collections.singletonList(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group")
.setErrorCode((short) 0)
.setErrorMessage(Errors.forCode((short) 0).message())
.setGroupState("EMPTY")
.setGroupEpoch(0)
.setAssignmentEpoch(0)
.setMembers(new ArrayList<>(0))
.setTopology(null)
))
.setThrottleTimeMs(1000);
return new StreamsGroupDescribeResponse(data);
}
private AbstractResponse createStreamsGroupHeartbeatResponse() {
return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData());
}

View File

@ -743,6 +743,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.STREAMS_GROUP_HEARTBEAT =>
new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData(), true)
case ApiKeys.STREAMS_GROUP_DESCRIBE =>
new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

View File

@ -175,6 +175,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConvert
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter;
import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter;
import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter;
import org.apache.kafka.common.message.StreamsGroupDescribeRequestDataJsonConverter;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseDataJsonConverter;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter;
import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
@ -358,6 +360,8 @@ import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
@ -545,6 +549,8 @@ public class RequestConvertToJson {
return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
case STREAMS_GROUP_DESCRIBE:
return StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest) request).data(), request.version());
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version());
case STOP_REPLICA:
@ -731,6 +737,8 @@ public class RequestConvertToJson {
return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
case STREAMS_GROUP_DESCRIBE:
return StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse) response).data(), version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version);
case STOP_REPLICA: