KAFKA-18282: Add StreamsGroupHeartbeat RPC definitions (#18227)

The StreamsGroupHeartbeat API is the new core API used by streams application to form a group. The API allows members to initialize a topology, advertise their state, and their owned tasks. The group coordinator uses it to assign/revoke tasks to/from members. This API is also used as a liveness check.

This change adds the JSON definition of the RPC, as defined in KIP-1071.

Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2024-12-18 11:43:44 +01:00 committed by GitHub
parent 4bcbf9fae7
commit ec32c8a376
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 546 additions and 2 deletions

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class StreamsInvalidTopologyEpochException extends ApiException {
public StreamsInvalidTopologyEpochException(String message) {
super(message);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class StreamsInvalidTopologyException extends ApiException {
public StreamsInvalidTopologyException(String message) {
super(message);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class StreamsTopologyFencedException extends ApiException {
public StreamsTopologyFencedException(String message) {
super(message);
}
}

View File

@ -130,7 +130,9 @@ public enum ApiKeys {
READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true),
WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true);
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -119,6 +119,9 @@ import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
@ -413,7 +416,10 @@ public enum Errors {
DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new),
VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new),
INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new),
REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new);
REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new),
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -350,6 +350,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return DeleteShareGroupStateRequest.parse(buffer, apiVersion);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -287,6 +287,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return DeleteShareGroupStateResponse.parse(responseBuffer, version);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class StreamsGroupHeartbeatRequest extends AbstractRequest {
/**
* A member epoch of <code>-1</code> means that the member wants to leave the group.
*/
public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2;
/**
* A member epoch of <code>0</code> means that the member wants to join the group.
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
public static class Builder extends AbstractRequest.Builder<StreamsGroupHeartbeatRequest> {
private final StreamsGroupHeartbeatRequestData data;
public Builder(StreamsGroupHeartbeatRequestData data) {
this(data, false);
}
public Builder(StreamsGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT, enableUnstableLastVersion);
this.data = data;
}
@Override
public StreamsGroupHeartbeatRequest build(short version) {
return new StreamsGroupHeartbeatRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final StreamsGroupHeartbeatRequestData data;
public StreamsGroupHeartbeatRequest(StreamsGroupHeartbeatRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT, version);
this.data = data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new StreamsGroupHeartbeatResponse(
new StreamsGroupHeartbeatResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
);
}
@Override
public StreamsGroupHeartbeatRequestData data() {
return data;
}
public static StreamsGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
return new StreamsGroupHeartbeatRequest(new StreamsGroupHeartbeatRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#FENCED_MEMBER_EPOCH}
* - {@link Errors#UNRELEASED_INSTANCE_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
* - {@link Errors#GROUP_ID_NOT_FOUND}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
* - {@link Errors#STREAMS_TOPOLOGY_FENCED}
*/
public class StreamsGroupHeartbeatResponse extends AbstractResponse {
private final StreamsGroupHeartbeatResponseData data;
public StreamsGroupHeartbeatResponse(StreamsGroupHeartbeatResponseData data) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT);
this.data = data;
}
@Override
public StreamsGroupHeartbeatResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static StreamsGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData(
new ByteBufferAccessor(buffer), version));
}
public enum Status {
STALE_TOPOLOGY((byte) 0, "The topology epoch supplied is inconsistent with the topology for this streams group."),
MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing or a source topic regex resolves to zero topics."),
INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected to be copartitioned are not copartitioned."),
MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are missing."),
SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the whole application.");
private final byte code;
private final String message;
Status(final byte code, final String message) {
this.code = code;
this.message = message;
}
public byte code() {
return code;
}
public String message() {
return message;
}
}
}

View File

@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 88,
"type": "request",
"listeners": ["broker", "zkBroker"],
"name": "StreamsGroupHeartbeatRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of the member otherwise." },
{ "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
"about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its tasks otherwise." },
{ "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The topology metadata of the streams application. Used to initialize the topology of the group and to check if the topology corresponds to the topology initialized for the group. Only sent when memberEpoch = 0, must be non-empty. Null otherwise.",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The epoch of the topology. Used to check if the topology corresponds to the topology initialized on the brokers." },
{ "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+",
"about": "The sub-topologies of the streams application.",
"fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "String to uniquely identify the subtopology. Deterministically generated from the topology" },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
{ "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
"about": "The regular expressions identifying topics the subtopology reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
"about": "The set of state changelog topics associated with this subtopology. Created automatically." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
"about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
"about": "The set of source topics that are internally created repartition topics. Created automatically." },
{ "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+",
"about": "A subset of source topics that must be copartitioned.",
"fields": [
{ "name": "SourceTopics", "type": "[]int16", "versions": "0+",
"about": "The topics the topology reads from. Index into the array on the subtopology level." },
{ "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
"about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." },
{ "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+",
"about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." }
]}
]}
]
},
{ "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Currently owned active tasks for this client. Null if unchanged since last heartbeat." },
{ "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Currently owned standby tasks for this client. Null if unchanged since last heartbeat." },
{ "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Currently owned warm-up tasks for this client. Null if unchanged since last heartbeat." },
{ "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." },
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "User-defined endpoint for Interactive Queries. Null if unchanged since last heartbeat, or if not defined on the client." },
{ "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." },
{ "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Cumulative changelog offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." },
{ "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Cumulative changelog end-offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." },
{ "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false,
"about": "Whether all Streams clients in the group should shut down." }
],
"commonStructs": [
{ "name": "KeyValue", "versions": "0+", "fields": [
{ "name": "Key", "type": "string", "versions": "0+",
"about": "key of the config" },
{ "name": "Value", "type": "string", "versions": "0+",
"about": "value of the config" }
]},
{ "name": "TopicInfo", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of the topic." },
{ "name": "Partitions", "type": "int32", "versions": "0+",
"about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
{ "name": "ReplicationFactor", "type": "int16", "versions": "0+",
"about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." },
{ "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
]},
{ "name": "Endpoint", "versions": "0+", "fields": [
{ "name": "Host", "type": "string", "versions": "0+",
"about": "host of the endpoint" },
{ "name": "Port", "type": "uint16", "versions": "0+",
"about": "port of the endpoint" }
]},
{ "name": "TaskOffset", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition." },
{ "name": "Offset", "type": "int64", "versions": "0+",
"about": "The offset." }
]},
{ "name": "TaskIds", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this member." }
]}
]
}

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 88,
"type": "response",
"name": "StreamsGroupHeartbeatResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_MEMBER_ID (version 0+)
// - FENCED_MEMBER_EPOCH (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - CLUSTER_AUTHORIZATION_FAILED (version 0+)
// - STREAMS_INVALID_TOPOLOGY (version 0+)
// - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
// - STREAMS_TOPOLOGY_FENCED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id is always generated by the streams consumer."},
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
{ "name": "AcceptableRecoveryLag", "type": "int32", "versions": "0+",
"about": "The maximal lag a warm-up task can have to be considered caught-up." },
{ "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
"about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." },
{ "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Indicate zero or more status for the group. Null if unchanged since last heartbeat." },
// The streams app knows which partitions to fetch from given this information
{ "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Assigned active tasks for this client. Null if unchanged since last heartbeat." },
{ "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Assigned standby tasks for this client. Null if unchanged since last heartbeat." },
{ "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." },
// IQ-related information
{ "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." ,
"fields": [
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"about": "User-defined endpoint to connect to the node" },
{ "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
"about": "All partitions available on the node" }
]
}
],
"commonStructs": [
{ "name": "Status", "versions": "0+", "fields": [
// Possible status codes
// 0 - STALE_TOPOLOGY - The topology epoch supplied is lower than the topology epoch for this streams group.
// 1 - MISSING_SOURCE_TOPICS - One or more source topics are missing or a source topic regex resolves to zero topics.
// Missing topics are indicated in the StatusDetail.
// 2 - INCORRECTLY_PARTITIONED_TOPICS - One or more topics are incorrectly partitioned, that is, they are not copartitioned despite being
// part of a copartition group, or the number of partitions in a changelog topic does not correspond
// to the maximal number of source topic partition for that subtopology.
// Incorrectly partitioned topics are indicated in the StatusDetail.
// 3 - MISSING_INTERNAL_TOPICS - One or more internal topics are missing.
// Missing topics are indicated in the StatusDetail.
// The group coordinator will attempt to create all missing internal topics, if any errors occur during
// topic creation, this will be indicated in StatusDetail.
// 4 - SHUTDOWN_APPLICATION - A client requested the shutdown of the whole application.
{ "name": "StatusCode", "type": "int8", "versions": "0+",
"about": "A code to indicate that a particular status is active for the group membership" },
{ "name": "StatusDetail", "type": "string", "versions": "0+",
"about": "A string representation of the status." }
]},
{ "name": "TopicPartition", "versions": "0+", "fields": [
{ "name": "Topic", "type": "string", "versions": "0+",
"about": "topic name" },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "partitions" }
]},
{ "name": "TaskIds", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this member." }
]},
{ "name": "Endpoint", "versions": "0+", "fields": [
{ "name": "Host", "type": "string", "versions": "0+",
"about": "host of the endpoint" },
{ "name": "Port", "type": "uint16", "versions": "0+",
"about": "port of the endpoint" }
]}
]
}

View File

@ -234,6 +234,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState;
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState;
import org.apache.kafka.common.message.StopReplicaResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
@ -1124,6 +1126,7 @@ public class RequestResponseTest {
case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateRequest(version);
case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version);
case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version);
case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1218,6 +1221,7 @@ public class RequestResponseTest {
case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse();
case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse();
case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse();
case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -4035,6 +4039,14 @@ public class RequestResponseTest {
return new ReadShareGroupStateSummaryResponse(data);
}
private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) {
return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version);
}
private AbstractResponse createStreamsGroupHeartbeatResponse() {
return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData());
}
@Test
public void testInvalidSaslHandShakeRequest() {
AbstractRequest request = new SaslHandshakeRequest.Builder(

View File

@ -739,6 +739,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY =>
new ReadShareGroupStateSummaryRequest.Builder(new ReadShareGroupStateSummaryRequestData(), true)
case ApiKeys.STREAMS_GROUP_HEARTBEAT =>
new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)

View File

@ -175,6 +175,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConvert
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter;
import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter;
import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter;
import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter;
import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter;
@ -356,6 +358,8 @@ import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@ -541,6 +545,8 @@ public class RequestConvertToJson {
return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version());
case STOP_REPLICA:
return StopReplicaRequestDataJsonConverter.write(((StopReplicaRequest) request).data(), request.version());
case SYNC_GROUP:
@ -725,6 +731,8 @@ public class RequestConvertToJson {
return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version);
case STOP_REPLICA:
return StopReplicaResponseDataJsonConverter.write(((StopReplicaResponse) response).data(), version);
case SYNC_GROUP: