KAFKA-16713: Define initial set of RPCs for KIP-932 (#16022)

This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.

Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2024-06-03 07:22:35 +01:00 committed by GitHub
parent 8507693229
commit 8f82f14a48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2100 additions and 9 deletions

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* The share group state.
*/
public enum ShareGroupState {
UNKNOWN("Unknown"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty");
private final static Map<String, ShareGroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
private final String name;
ShareGroupState(String name) {
this.name = name;
}
/**
* Case-insensitive share group state lookup by string name.
*/
public static ShareGroupState parse(String name) {
ShareGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT));
return state == null ? UNKNOWN : state;
}
@Override
public String toString() {
return name;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Thrown when the share coordinator rejected the request because the share-group state epoch did not match.
*/
public class FencedStateEpochException extends ApiException {
private static final long serialVersionUID = 1L;
public FencedStateEpochException(String message) {
super(message);
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Thrown when the acknowledgement of delivery of a record could not be completed because the record
* state is invalid.
*/
public class InvalidRecordStateException extends ApiException {
private static final long serialVersionUID = 1L;
public InvalidRecordStateException(String message) {
super(message);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Thrown when the share session epoch is invalid.
*/
public class InvalidShareSessionEpochException extends RetriableException {
private static final long serialVersionUID = 1L;
public InvalidShareSessionEpochException(String message) {
super(message);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Thrown when the share session was not found.
*/
public class ShareSessionNotFoundException extends RetriableException {
private static final long serialVersionUID = 1L;
public ShareSessionNotFoundException(String message) {
super(message);
}
}

View File

@ -118,7 +118,11 @@ public enum ApiKeys {
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS);
DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS),
SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT),
SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
SHARE_FETCH(ApiMessageType.SHARE_FETCH),
SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@ -64,12 +65,14 @@ import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
@ -109,6 +112,7 @@ import org.apache.kafka.common.errors.ResourceNotFoundException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@ -394,7 +398,11 @@ public enum Errors {
UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new),
INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new),
TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new);
TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new),
INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new),
SHARE_SESSION_NOT_FOUND(122, "The share session was not found.", ShareSessionNotFoundException::new),
INVALID_SHARE_SESSION_EPOCH(123, "The share session epoch is invalid.", InvalidShareSessionEpochException::new),
FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -326,6 +326,14 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return ListClientMetricsResourcesRequest.parse(buffer, apiVersion);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsRequest.parse(buffer, apiVersion);
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatRequest.parse(buffer, apiVersion);
case SHARE_GROUP_DESCRIBE:
return ShareGroupDescribeRequest.parse(buffer, apiVersion);
case SHARE_FETCH:
return ShareFetchRequest.parse(buffer, apiVersion);
case SHARE_ACKNOWLEDGE:
return ShareAcknowledgeRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -263,6 +263,14 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return ListClientMetricsResourcesResponse.parse(responseBuffer, version);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsResponse.parse(responseBuffer, version);
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatResponse.parse(responseBuffer, version);
case SHARE_GROUP_DESCRIBE:
return ShareGroupDescribeResponse.parse(responseBuffer, version);
case SHARE_FETCH:
return ShareFetchResponse.parse(responseBuffer, version);
case SHARE_ACKNOWLEDGE:
return ShareAcknowledgeResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ShareAcknowledgeRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ShareAcknowledgeRequest> {
private final ShareAcknowledgeRequestData data;
public Builder(ShareAcknowledgeRequestData data) {
this(data, false);
}
public Builder(ShareAcknowledgeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_ACKNOWLEDGE, enableUnstableLastVersion);
this.data = data;
}
public static ShareAcknowledgeRequest.Builder forConsumer(String groupId, ShareFetchMetadata metadata,
Map<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareAcknowledgeRequestData data = new ShareAcknowledgeRequestData();
data.setGroupId(groupId);
if (metadata != null) {
data.setMemberId(metadata.memberId().toString());
data.setShareSessionEpoch(metadata.epoch());
}
// Build a map of topics to acknowledge keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareAcknowledgeRequestData.AcknowledgePartition>> ackMap = new HashMap<>();
for (Map.Entry<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgeEntry : acknowledgementsMap.entrySet()) {
TopicIdPartition tip = acknowledgeEntry.getKey();
Map<Integer, ShareAcknowledgeRequestData.AcknowledgePartition> partMap = ackMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareAcknowledgeRequestData.AcknowledgePartition ackPartition = partMap.get(tip.partition());
if (ackPartition == null) {
ackPartition = new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), ackPartition);
}
ackPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
}
// Finally, build up the data to fetch
data.setTopics(new ArrayList<>());
ackMap.forEach((topicId, partMap) -> {
ShareAcknowledgeRequestData.AcknowledgeTopic ackTopic = new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(topicId)
.setPartitions(new ArrayList<>());
data.topics().add(ackTopic);
partMap.forEach((index, ackPartition) -> ackTopic.partitions().add(ackPartition));
});
return new ShareAcknowledgeRequest.Builder(data, true);
}
public ShareAcknowledgeRequestData data() {
return data;
}
@Override
public ShareAcknowledgeRequest build(short version) {
return new ShareAcknowledgeRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ShareAcknowledgeRequestData data;
public ShareAcknowledgeRequest(ShareAcknowledgeRequestData data, short version) {
super(ApiKeys.SHARE_ACKNOWLEDGE, version);
this.data = data;
}
@Override
public ShareAcknowledgeRequestData data() {
return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
return new ShareAcknowledgeResponse(new ShareAcknowledgeResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code()));
}
public static ShareAcknowledgeRequest parse(ByteBuffer buffer, short version) {
return new ShareAcknowledgeRequest(
new ShareAcknowledgeRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Possible error codes.
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
* - {@link Errors#NOT_LEADER_OR_FOLLOWER}
* - {@link Errors#UNKNOWN_TOPIC_ID}
* - {@link Errors#INVALID_RECORD_STATE}
* - {@link Errors#KAFKA_STORAGE_ERROR}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_SERVER_ERROR}
*/
public class ShareAcknowledgeResponse extends AbstractResponse {
private final ShareAcknowledgeResponseData data;
public ShareAcknowledgeResponse(ShareAcknowledgeResponseData data) {
super(ApiKeys.SHARE_ACKNOWLEDGE);
this.data = data;
}
public Errors error() {
return Errors.forCode(data.errorCode());
}
@Override
public ShareAcknowledgeResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.responses().forEach(
topic -> topic.partitions().forEach(
partition -> updateErrorCounts(counts, Errors.forCode(partition.errorCode()))
)
);
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ShareAcknowledgeResponse parse(ByteBuffer buffer, short version) {
return new ShareAcknowledgeResponse(
new ShareAcknowledgeResponseData(new ByteBufferAccessor(buffer), version)
);
}
private static boolean matchingTopic(ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse previousTopic, TopicIdPartition currentTopic) {
if (previousTopic == null)
return false;
return previousTopic.topicId().equals(currentTopic.topicId());
}
public static ShareAcknowledgeResponseData.PartitionData partitionResponse(TopicIdPartition topicIdPartition, Errors error) {
return partitionResponse(topicIdPartition.topicPartition().partition(), error);
}
public static ShareAcknowledgeResponseData.PartitionData partitionResponse(int partition, Errors error) {
return new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(error.code());
}
public static ShareAcknowledgeResponse of(Errors error,
int throttleTimeMs,
LinkedHashMap<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> responseData,
List<Node> nodeEndpoints) {
return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
}
public static ShareAcknowledgeResponseData toMessage(Errors error, int throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> partIterator,
List<Node> nodeEndpoints) {
Map<Uuid, ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse> topicResponseList = new LinkedHashMap<>();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> entry = partIterator.next();
ShareAcknowledgeResponseData.PartitionData partitionData = entry.getValue();
// Since PartitionData alone doesn't know the partition ID, we set it here
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
// Checking if the topic is already present in the map
if (topicResponseList.containsKey(entry.getKey().topicId())) {
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
} else {
List<ShareAcknowledgeResponseData.PartitionData> partitionResponses = new ArrayList<>();
partitionResponses.add(partitionData);
topicResponseList.put(entry.getKey().topicId(), new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
.setTopicId(entry.getKey().topicId())
.setPartitions(partitionResponses));
}
}
ShareAcknowledgeResponseData data = new ShareAcknowledgeResponseData();
// KafkaApis should only pass in node endpoints on error, otherwise this should be an empty list
nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(
new ShareAcknowledgeResponseData.NodeEndpoint()
.setNodeId(endpoint.id())
.setHost(endpoint.host())
.setPort(endpoint.port())
.setRack(endpoint.rack())));
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setResponses(new ArrayList<>(topicResponseList.values()));
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
public class ShareFetchMetadata {
/**
* The first epoch. When used in a ShareFetch request, indicates that the client
* wants to create a session.
*/
public static final int INITIAL_EPOCH = 0;
/**
* An invalid epoch. When used in a ShareFetch request, indicates that the client
* wants to close an existing session.
*/
public static final int FINAL_EPOCH = -1;
/**
*
*/
public boolean isNewSession() {
return epoch == INITIAL_EPOCH;
}
/**
* Returns true if this is a full share fetch request.
*/
public boolean isFull() {
return (this.epoch == INITIAL_EPOCH) || (this.epoch == FINAL_EPOCH);
}
/**
* Returns the next epoch.
*
* @param prevEpoch The previous epoch.
* @return The next epoch.
*/
public static int nextEpoch(int prevEpoch) {
if (prevEpoch < 0) {
// The next epoch after FINAL_EPOCH is always FINAL_EPOCH itself.
return FINAL_EPOCH;
} else if (prevEpoch == Integer.MAX_VALUE) {
return 1;
} else {
return prevEpoch + 1;
}
}
/**
* The member ID.
*/
private final Uuid memberId;
/**
* The share session epoch.
*/
private final int epoch;
public ShareFetchMetadata(Uuid memberId, int epoch) {
this.memberId = memberId;
this.epoch = epoch;
}
public static ShareFetchMetadata initialEpoch(Uuid memberId) {
return new ShareFetchMetadata(memberId, INITIAL_EPOCH);
}
public ShareFetchMetadata nextEpoch() {
return new ShareFetchMetadata(memberId, nextEpoch(epoch));
}
public ShareFetchMetadata nextCloseExistingAttemptNew() {
return new ShareFetchMetadata(memberId, INITIAL_EPOCH);
}
public ShareFetchMetadata finalEpoch() {
return new ShareFetchMetadata(memberId, FINAL_EPOCH);
}
public Uuid memberId() {
return memberId;
}
public int epoch() {
return epoch;
}
public boolean isFinalEpoch() {
return epoch == FINAL_EPOCH;
}
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(memberId=").append(memberId).append(", ");
if (epoch == INITIAL_EPOCH) {
bld.append("epoch=INITIAL)");
} else if (epoch == FINAL_EPOCH) {
bld.append("epoch=FINAL)");
} else {
bld.append("epoch=").append(epoch).append(")");
}
return bld.toString();
}
}

View File

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ShareFetchRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ShareFetchRequest> {
private final ShareFetchRequestData data;
public Builder(ShareFetchRequestData data) {
this(data, false);
}
public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_FETCH, enableUnstableLastVersion);
this.data = data;
}
public static Builder forConsumer(String groupId, ShareFetchMetadata metadata,
int maxWait, int minBytes, int maxBytes, int fetchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
data.setGroupId(groupId);
int ackOnlyPartitionMaxBytes = fetchSize;
boolean isClosingShareSession = false;
if (metadata != null) {
data.setMemberId(metadata.memberId().toString());
data.setShareSessionEpoch(metadata.epoch());
if (metadata.isFinalEpoch()) {
isClosingShareSession = true;
ackOnlyPartitionMaxBytes = 0;
}
}
data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes);
// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>();
// First, start by adding the list of topic-partitions we are fetching
if (!isClosingShareSession) {
for (TopicIdPartition tip : send) {
Map<Integer, ShareFetchRequestData.FetchPartition> partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareFetchRequestData.FetchPartition fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition())
.setPartitionMaxBytes(fetchSize);
partMap.put(tip.partition(), fetchPartition);
}
}
// Next, add acknowledgements that we are piggybacking onto the fetch. Generally, the list of
// topic-partitions will be a subset, but if the assignment changes, there might be new entries to add
for (Map.Entry<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgeEntry : acknowledgementsMap.entrySet()) {
TopicIdPartition tip = acknowledgeEntry.getKey();
Map<Integer, ShareFetchRequestData.FetchPartition> partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareFetchRequestData.FetchPartition fetchPartition = partMap.get(tip.partition());
if (fetchPartition == null) {
fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition())
.setPartitionMaxBytes(ackOnlyPartitionMaxBytes);
partMap.put(tip.partition(), fetchPartition);
}
fetchPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
}
// Build up the data to fetch
if (!fetchMap.isEmpty()) {
data.setTopics(new ArrayList<>());
fetchMap.forEach((topicId, partMap) -> {
ShareFetchRequestData.FetchTopic fetchTopic = new ShareFetchRequestData.FetchTopic()
.setTopicId(topicId)
.setPartitions(new ArrayList<>());
partMap.forEach((index, fetchPartition) -> fetchTopic.partitions().add(fetchPartition));
data.topics().add(fetchTopic);
});
}
// And finally, forget the topic-partitions that are no longer in the session
if (!forget.isEmpty()) {
Map<Uuid, List<Integer>> forgetMap = new HashMap<>();
for (TopicIdPartition tip : forget) {
List<Integer> partList = forgetMap.computeIfAbsent(tip.topicId(), k -> new ArrayList<>());
partList.add(tip.partition());
}
data.setForgottenTopicsData(new ArrayList<>());
forgetMap.forEach((topicId, partList) -> {
ShareFetchRequestData.ForgottenTopic forgetTopic = new ShareFetchRequestData.ForgottenTopic()
.setTopicId(topicId)
.setPartitions(new ArrayList<>());
partList.forEach(index -> forgetTopic.partitions().add(index));
data.forgottenTopicsData().add(forgetTopic);
});
}
return new Builder(data, true);
}
public ShareFetchRequestData data() {
return data;
}
@Override
public ShareFetchRequest build(short version) {
return new ShareFetchRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ShareFetchRequestData data;
private volatile LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData = null;
private volatile List<TopicIdPartition> toForget = null;
public ShareFetchRequest(ShareFetchRequestData data, short version) {
super(ApiKeys.SHARE_FETCH, version);
this.data = data;
}
@Override
public ShareFetchRequestData data() {
return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
return new ShareFetchResponse(new ShareFetchResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code()));
}
public static ShareFetchRequest parse(ByteBuffer buffer, short version) {
return new ShareFetchRequest(
new ShareFetchRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
public static final class SharePartitionData {
public final Uuid topicId;
public final int maxBytes;
public SharePartitionData(
Uuid topicId,
int maxBytes
) {
this.topicId = topicId;
this.maxBytes = maxBytes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareFetchRequest.SharePartitionData that = (ShareFetchRequest.SharePartitionData) o;
return Objects.equals(topicId, that.topicId) &&
maxBytes == that.maxBytes;
}
@Override
public int hashCode() {
return Objects.hash(topicId, maxBytes);
}
@Override
public String toString() {
return "SharePartitionData(" +
"topicId=" + topicId +
", maxBytes=" + maxBytes +
')';
}
}
public int minBytes() {
return data.minBytes();
}
public int maxBytes() {
return data.maxBytes();
}
public int maxWait() {
return data.maxWaitMs();
}
public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData(Map<Uuid, String> topicNames) {
if (shareFetchData == null) {
synchronized (this) {
if (shareFetchData == null) {
// Assigning the lazy-initialized `shareFetchData` in the last step
// to avoid other threads accessing a half-initialized object.
final LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataTmp = new LinkedHashMap<>();
data.topics().forEach(shareFetchTopic -> {
String name = topicNames.get(shareFetchTopic.topicId());
shareFetchTopic.partitions().forEach(shareFetchPartition -> {
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
shareFetchDataTmp.put(new TopicIdPartition(shareFetchTopic.topicId(), new TopicPartition(name, shareFetchPartition.partitionIndex())),
new ShareFetchRequest.SharePartitionData(
shareFetchTopic.topicId(),
shareFetchPartition.partitionMaxBytes()
)
);
});
});
shareFetchData = shareFetchDataTmp;
}
}
}
return shareFetchData;
}
public List<TopicIdPartition> forgottenTopics(Map<Uuid, String> topicNames) {
if (toForget == null) {
synchronized (this) {
if (toForget == null) {
// Assigning the lazy-initialized `toForget` in the last step
// to avoid other threads accessing a half-initialized object.
final List<TopicIdPartition> toForgetTmp = new ArrayList<>();
data.forgottenTopicsData().forEach(forgottenTopic -> {
String name = topicNames.get(forgottenTopic.topicId());
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
forgottenTopic.partitions().forEach(partitionId -> toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId))));
});
toForget = toForgetTmp;
}
}
}
return toForget;
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Possible error codes.
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
* - {@link Errors#NOT_LEADER_OR_FOLLOWER}
* - {@link Errors#UNKNOWN_TOPIC_ID}
* - {@link Errors#INVALID_RECORD_STATE}
* - {@link Errors#KAFKA_STORAGE_ERROR}
* - {@link Errors#CORRUPT_MESSAGE}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_SERVER_ERROR}
*/
public class ShareFetchResponse extends AbstractResponse {
private final ShareFetchResponseData data;
private volatile LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = null;
public ShareFetchResponse(ShareFetchResponseData data) {
super(ApiKeys.SHARE_FETCH);
this.data = data;
}
public Errors error() {
return Errors.forCode(data.errorCode());
}
@Override
public ShareFetchResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.responses().forEach(
topic -> topic.partitions().forEach(
partition -> updateErrorCounts(counts, Errors.forCode(partition.errorCode()))
)
);
return counts;
}
public LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames) {
if (responseData == null) {
synchronized (this) {
// Assigning the lazy-initialized `responseData` in the last step
// to avoid other threads accessing a half-initialized object.
if (responseData == null) {
final LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>();
data.responses().forEach(topicResponse -> {
String name = topicNames.get(topicResponse.topicId());
if (name != null) {
topicResponse.partitions().forEach(partitionData -> responseDataTmp.put(new TopicIdPartition(topicResponse.topicId(),
new TopicPartition(name, partitionData.partitionIndex())), partitionData));
}
});
responseData = responseDataTmp;
}
}
}
return responseData;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ShareFetchResponse parse(ByteBuffer buffer, short version) {
return new ShareFetchResponse(
new ShareFetchResponseData(new ByteBufferAccessor(buffer), version)
);
}
/**
* Returns `partition.records` as `Records` (instead of `BaseRecords`). If `records` is `null`, returns `MemoryRecords.EMPTY`.
*
* <p>If this response was deserialized after a share fetch, this method should never fail. An example where this would
* fail is a down-converted response (e.g. LazyDownConversionRecords) on the broker (before it's serialized and
* sent on the wire).
*
* @param partition partition data
* @return Records or empty record if the records in PartitionData is null.
*/
public static Records recordsOrFail(ShareFetchResponseData.PartitionData partition) {
if (partition.records() == null) return MemoryRecords.EMPTY;
if (partition.records() instanceof Records) return (Records) partition.records();
throw new ClassCastException("The record type is " + partition.records().getClass().getSimpleName() + ", which is not a subtype of " +
Records.class.getSimpleName() + ". This method is only safe to call if the `ShareFetchResponse` was deserialized from bytes.");
}
/**
* Convenience method to find the size of a response.
*
* @param version The version of the request
* @param partIterator The partition iterator.
* @return The response size in bytes.
*/
public static int sizeOf(short version,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
// use arbitrary values here without affecting the result.
ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList());
ObjectSerializationCache cache = new ObjectSerializationCache();
return 4 + data.size(cache, version);
}
/**
* @return The size in bytes of the records. 0 is returned if records of input partition is null.
*/
public static int recordsSize(ShareFetchResponseData.PartitionData partition) {
return partition.records() == null ? 0 : partition.records().sizeInBytes();
}
public static ShareFetchResponse of(Errors error,
int throttleTimeMs,
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData,
List<Node> nodeEndpoints) {
return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
}
public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator,
List<Node> nodeEndpoints) {
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> topicResponseList = new LinkedHashMap<>();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> entry = partIterator.next();
ShareFetchResponseData.PartitionData partitionData = entry.getValue();
// Since PartitionData alone doesn't know the partition ID, we set it here
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
// Checking if the topic is already present in the map
if (topicResponseList.containsKey(entry.getKey().topicId())) {
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
} else {
List<ShareFetchResponseData.PartitionData> partitionResponses = new ArrayList<>();
partitionResponses.add(partitionData);
topicResponseList.put(entry.getKey().topicId(), new ShareFetchResponseData.ShareFetchableTopicResponse()
.setTopicId(entry.getKey().topicId())
.setPartitions(partitionResponses));
}
}
ShareFetchResponseData data = new ShareFetchResponseData();
// KafkaApis should only pass in node endpoints on error, otherwise this should be an empty list
nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(
new ShareFetchResponseData.NodeEndpoint()
.setNodeId(endpoint.id())
.setHost(endpoint.host())
.setPort(endpoint.port())
.setRack(endpoint.rack())));
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setResponses(new ArrayList<>(topicResponseList.values()));
}
public static ShareFetchResponseData.PartitionData partitionResponse(TopicIdPartition topicIdPartition, Errors error) {
return partitionResponse(topicIdPartition.topicPartition().partition(), error);
}
public static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) {
return new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(error.code());
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
public class ShareGroupDescribeRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ShareGroupDescribeRequest> {
private final ShareGroupDescribeRequestData data;
public Builder(ShareGroupDescribeRequestData data) {
this(data, false);
}
public Builder(ShareGroupDescribeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_GROUP_DESCRIBE, enableUnstableLastVersion);
this.data = data;
}
@Override
public ShareGroupDescribeRequest build(short version) {
return new ShareGroupDescribeRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ShareGroupDescribeRequestData data;
public ShareGroupDescribeRequest(ShareGroupDescribeRequestData data, short version) {
super(ApiKeys.SHARE_GROUP_DESCRIBE, version);
this.data = data;
}
@Override
public ShareGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ShareGroupDescribeResponseData data = new ShareGroupDescribeResponseData()
.setThrottleTimeMs(throttleTimeMs);
// Set error for each group
short errorCode = Errors.forException(e).code();
this.data.groupIds().forEach(
groupId -> data.groups().add(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(errorCode)
)
);
return new ShareGroupDescribeResponse(data);
}
@Override
public ShareGroupDescribeRequestData data() {
return data;
}
public static ShareGroupDescribeRequest parse(ByteBuffer buffer, short version) {
return new ShareGroupDescribeRequest(
new ShareGroupDescribeRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
public static List<ShareGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList(
List<String> groupIds,
Errors error
) {
return groupIds.stream()
.map(groupId -> new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(error.code())
).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
*/
public class ShareGroupDescribeResponse extends AbstractResponse {
private final ShareGroupDescribeResponseData data;
public ShareGroupDescribeResponse(ShareGroupDescribeResponseData data) {
super(ApiKeys.SHARE_GROUP_DESCRIBE);
this.data = data;
}
@Override
public ShareGroupDescribeResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
data.groups().forEach(
group -> updateErrorCounts(counts, Errors.forCode(group.errorCode()))
);
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ShareGroupDescribeResponse parse(ByteBuffer buffer, short version) {
return new ShareGroupDescribeResponse(
new ShareGroupDescribeResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import java.nio.ByteBuffer;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
public class ShareGroupHeartbeatRequest extends AbstractRequest {
/**
* A member epoch of <code>-1</code> means that the member wants to leave the group.
*/
public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
/**
* A member epoch of <code>0</code> means that the member wants to join the group.
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
public static class Builder extends AbstractRequest.Builder<ShareGroupHeartbeatRequest> {
private final ShareGroupHeartbeatRequestData data;
public Builder(ShareGroupHeartbeatRequestData data) {
this(data, true);
}
public Builder(ShareGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_GROUP_HEARTBEAT, enableUnstableLastVersion);
this.data = data;
}
@Override
public ShareGroupHeartbeatRequest build(short version) {
return new ShareGroupHeartbeatRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ShareGroupHeartbeatRequestData data;
public ShareGroupHeartbeatRequest(ShareGroupHeartbeatRequestData data, short version) {
super(ApiKeys.SHARE_GROUP_HEARTBEAT, version);
this.data = data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new ShareGroupHeartbeatResponse(
new ShareGroupHeartbeatResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
);
}
@Override
public ShareGroupHeartbeatRequestData data() {
return data;
}
public static ShareGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
return new ShareGroupHeartbeatRequest(new ShareGroupHeartbeatRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
*/
public class ShareGroupHeartbeatResponse extends AbstractResponse {
private final ShareGroupHeartbeatResponseData data;
public ShareGroupHeartbeatResponse(ShareGroupHeartbeatResponseData data) {
super(ApiKeys.SHARE_GROUP_HEARTBEAT);
this.data = data;
}
@Override
public ShareGroupHeartbeatResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ShareGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -27,7 +27,9 @@
// Version 4 adds support for batching via CoordinatorKeys (KIP-699)
//
// Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
//
// Version 6 adds support for share groups (KIP-932).
"validVersions": "0-6",
"deprecatedVersions": "0",
"flexibleVersions": "3+",
"fields": [

View File

@ -26,7 +26,9 @@
// Version 4 adds support for batching via Coordinators (KIP-699)
//
// Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
//
// Version 6 adds support for share groups (KIP-932).
"validVersions": "0-6",
"flexibleVersions": "3+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

View File

@ -25,7 +25,9 @@
// Version 4 adds the StatesFilter field (KIP-518).
//
// Version 5 adds the TypesFilter field (KIP-848).
"validVersions": "0-5",
//
// Version 6 adds support for share groups (KIP-932).
"validVersions": "0-6",
"flexibleVersions": "3+",
"fields": [
{ "name": "StatesFilter", "type": "[]string", "versions": "4+",

View File

@ -27,7 +27,9 @@
// Version 4 adds the GroupState field (KIP-518).
//
// Version 5 adds the GroupType field (KIP-848).
"validVersions": "0-5",
//
// Version 6 adds support for share groups (KIP-932).
"validVersions": "0-6",
"flexibleVersions": "3+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 79,
"type": "request",
"listeners": ["broker"],
"name": "ShareAcknowledgeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The ShareAcknowledgeRequest API is added as part of KIP-932 and is still under
// development. Hence, the API is not exposed by default by brokers unless
// explicitly enabled.
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The member ID." },
{ "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
"about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
{ "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
"about": "The topics containing records to acknowledge.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
"about": "The partitions containing records to acknowledge.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "First offset of batch of records to acknowledge."},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "Last offset (inclusive) of batch of records to acknowledge."},
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
"about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
]}
]}
]}
]
}

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 79,
"type": "response",
"name": "ShareAcknowledgeResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - SHARE_SESSION_NOT_FOUND (version 0+)
// - INVALID_SHARE_SESSION_EPOCH (version 0+)
// - NOT_LEADER_OR_FOLLOWER (version 0+)
// - UNKNOWN_TOPIC_ID (version 0+)
// - INVALID_RECORD_STATE (version 0+)
// - KAFKA_STORAGE_ERROR (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_SERVER_ERROR (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
"about": "The top level response error code." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The topic partitions.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." },
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "0+",
"about": "The ID of the current leader or -1 if the leader is unknown." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch." }
]}
]}
]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
"about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "0+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "0+",
"about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
]}
]
}

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 78,
"type": "request",
"listeners": ["broker"],
"name": "ShareFetchRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The ShareFetchRequest API is added as part of KIP-932 and is still under
// development. Hence, the API is not exposed by default by brokers unless
// explicitly enabled.
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The member ID." },
{ "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
"about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
{ "name": "MaxWaitMs", "type": "int32", "versions": "0+",
"about": "The maximum time in milliseconds to wait for the response." },
{ "name": "MinBytes", "type": "int32", "versions": "0+",
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
"about": "The partitions to fetch.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "First offset of batch of records to acknowledge."},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "Last offset (inclusive) of batch of records to acknowledge."},
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
"about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
]}
]}
]},
{ "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
"about": "The partitions to remove from this share session.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions indexes to forget." }
]}
]
}

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 78,
"type": "response",
"name": "ShareFetchResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors for ErrorCode and AcknowledgeErrorCode:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - SHARE_SESSION_NOT_FOUND (version 0+)
// - INVALID_SHARE_SESSION_EPOCH (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - NOT_LEADER_OR_FOLLOWER (version 0+)
// - UNKNOWN_TOPIC_ID (version 0+)
// - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
// - KAFKA_STORAGE_ERROR (version 0+)
// - CORRUPT_MESSAGE (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_SERVER_ERROR (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
"about": "The top-level response error code." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The topic partitions.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The fetch error code, or 0 if there was no fetch error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The fetch error message, or null if there was no fetch error." },
{ "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
"about": "The acknowledge error code, or 0 if there was no acknowledge error." },
{ "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The acknowledge error message, or null if there was no acknowledge error." },
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "0+",
"about": "The ID of the current leader or -1 if the leader is unknown." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch." }
]},
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
{ "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [
{"name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
{"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
{"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
]}
]}
]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
"about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "0+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "0+",
"about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
]}
]
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 77,
"type": "request",
"listeners": ["broker"],
"name": "ShareGroupDescribeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The ShareGroupDescribeRequest API is added as part of KIP-932 and is still under
// development. Hence, the API is not exposed by default by brokers unless
// explicitly enabled.
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
]
}

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 77,
"type": "response",
"name": "ShareGroupDescribeResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group ID string." },
{ "name": "GroupState", "type": "string", "versions": "0+",
"about": "The group state string, or the empty string." },
{ "name": "GroupEpoch", "type": "int32", "versions": "0+",
"about": "The group epoch." },
{ "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
"about": "The assignment epoch." },
{ "name": "AssignorName", "type": "string", "versions": "0+",
"about": "The selected assignor." },
{ "name": "Members", "type": "[]Member", "versions": "0+",
"about": "The members.",
"fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member rack ID." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch." },
{ "name": "ClientId", "type": "string", "versions": "0+",
"about": "The client ID." },
{ "name": "ClientHost", "type": "string", "versions": "0+",
"about": "The client host." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
"about": "The subscribed topic names." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]},
{ "name": "Assignment", "versions": "0+", "fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The assigned topic-partitions to the member." }
]}
]
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 76,
"type": "request",
"listeners": ["broker"],
"name": "ShareGroupHeartbeatRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The ShareGroupHeartbeatRequest API is added as part of KIP-932 and is still under
// development. Hence, the API is not exposed by default by brokers unless
// explicitly enabled.
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
]
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 76,
"type": "response",
"name": "ShareGroupHeartbeatResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_MEMBER_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member." }
]}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
}

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
@ -210,6 +211,14 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState;
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState;
import org.apache.kafka.common.message.StopReplicaResponseData;
@ -1001,6 +1010,10 @@ public class RequestResponseTest {
assertEquals(1, createTxnOffsetCommitResponse().errorCounts().get(Errors.NONE));
assertEquals(1, createUpdateMetadataResponse().errorCounts().get(Errors.NONE));
assertEquals(1, createWriteTxnMarkersResponse().errorCounts().get(Errors.NONE));
assertEquals(1, createShareGroupHeartbeatResponse().errorCounts().get(Errors.NONE));
assertEquals(1, createShareGroupDescribeResponse().errorCounts().get(Errors.NONE));
assertEquals(2, createShareFetchResponse().errorCounts().get(Errors.NONE));
assertEquals(2, createShareAcknowledgeResponse().errorCounts().get(Errors.NONE));
}
private AbstractRequest getRequest(ApiKeys apikey, short version) {
@ -1081,6 +1094,10 @@ public class RequestResponseTest {
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsRequest(version);
case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesRequest(version);
case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsRequest(version);
case SHARE_GROUP_HEARTBEAT: return createShareGroupHeartbeatRequest(version);
case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeRequest(version);
case SHARE_FETCH: return createShareFetchRequest(version);
case SHARE_ACKNOWLEDGE: return createShareAcknowledgeRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1163,6 +1180,10 @@ public class RequestResponseTest {
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsResponse();
case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesResponse();
case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsResponse();
case SHARE_GROUP_HEARTBEAT: return createShareGroupHeartbeatResponse();
case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeResponse();
case SHARE_FETCH: return createShareFetchResponse();
case SHARE_ACKNOWLEDGE: return createShareAcknowledgeResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1330,6 +1351,114 @@ public class RequestResponseTest {
return new ConsumerGroupHeartbeatResponse(data);
}
private ShareGroupHeartbeatRequest createShareGroupHeartbeatRequest(short version) {
ShareGroupHeartbeatRequestData data = new ShareGroupHeartbeatRequestData()
.setGroupId("group")
.setMemberId("memberid")
.setMemberEpoch(10)
.setRackId("rackid")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"));
return new ShareGroupHeartbeatRequest.Builder(data).build(version);
}
private ShareGroupHeartbeatResponse createShareGroupHeartbeatResponse() {
ShareGroupHeartbeatResponseData data = new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(1000)
.setMemberId("memberid")
.setMemberEpoch(11)
.setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Arrays.asList(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(0, 1, 2)),
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(3, 4, 5))
))
);
return new ShareGroupHeartbeatResponse(data);
}
private ShareGroupDescribeRequest createShareGroupDescribeRequest(short version) {
ShareGroupDescribeRequestData data = new ShareGroupDescribeRequestData()
.setGroupIds(Collections.singletonList("group"))
.setIncludeAuthorizedOperations(false);
return new ShareGroupDescribeRequest.Builder(data).build(version);
}
private ShareGroupDescribeResponse createShareGroupDescribeResponse() {
ShareGroupDescribeResponseData data = new ShareGroupDescribeResponseData()
.setGroups(Collections.singletonList(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group")
.setErrorCode((short) 0)
.setErrorMessage(Errors.forCode((short) 0).message())
.setGroupState(ShareGroupState.EMPTY.toString())
.setMembers(new ArrayList<>(0))
))
.setThrottleTimeMs(1000);
return new ShareGroupDescribeResponse(data);
}
private ShareFetchRequest createShareFetchRequest(short version) {
ShareFetchRequestData data = new ShareFetchRequestData()
.setGroupId("group")
.setMemberId(Uuid.randomUuid().toString())
.setTopics(singletonList(new ShareFetchRequestData.FetchTopic()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(0)))));
return new ShareFetchRequest.Builder(data).build(version);
}
private ShareFetchResponse createShareFetchResponse() {
ShareFetchResponseData data = new ShareFetchResponseData();
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("blah".getBytes()));
ShareFetchResponseData.PartitionData partition = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
.setRecords(records)
.setAcquiredRecords(singletonList(new ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0)
.setLastOffset(0)
.setDeliveryCount((short) 1)));
ShareFetchResponseData.ShareFetchableTopicResponse response = new ShareFetchResponseData.ShareFetchableTopicResponse()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(partition));
data.setResponses(singletonList(response));
data.setThrottleTimeMs(345);
data.setErrorCode(Errors.NONE.code());
return new ShareFetchResponse(data);
}
private ShareAcknowledgeRequest createShareAcknowledgeRequest(short version) {
ShareAcknowledgeRequestData data = new ShareAcknowledgeRequestData()
.setMemberId(Uuid.randomUuid().toString())
.setTopics(singletonList(new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(0)
.setAcknowledgementBatches(singletonList(new ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
.setLastOffset(0)
.setAcknowledgeTypes(Collections.singletonList((byte) 0))))))));
return new ShareAcknowledgeRequest.Builder(data).build(version);
}
private ShareAcknowledgeResponse createShareAcknowledgeResponse() {
ShareAcknowledgeResponseData data = new ShareAcknowledgeResponseData();
data.setResponses(singletonList(new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))));
data.setThrottleTimeMs(345);
data.setErrorCode(Errors.NONE.code());
return new ShareAcknowledgeResponse(data);
}
private ControllerRegistrationRequest createControllerRegistrationRequest(short version) {
ControllerRegistrationRequestData data = new ControllerRegistrationRequestData().
setControllerId(3).

View File

@ -95,6 +95,10 @@ object RequestConvertToJson {
case req: RenewDelegationTokenRequest => RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
case req: SaslAuthenticateRequest => SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version)
case req: SaslHandshakeRequest => SaslHandshakeRequestDataJsonConverter.write(req.data, request.version)
case req: ShareAcknowledgeRequest => ShareAcknowledgeRequestDataJsonConverter.write(req.data, request.version)
case req: ShareFetchRequest => ShareFetchRequestDataJsonConverter.write(req.data, request.version)
case req: ShareGroupDescribeRequest => ShareGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
case req: ShareGroupHeartbeatRequest => ShareGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
case req: StopReplicaRequest => StopReplicaRequestDataJsonConverter.write(req.data, request.version)
case req: SyncGroupRequest => SyncGroupRequestDataJsonConverter.write(req.data, request.version)
case req: TxnOffsetCommitRequest => TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version)
@ -178,6 +182,10 @@ object RequestConvertToJson {
case res: RenewDelegationTokenResponse => RenewDelegationTokenResponseDataJsonConverter.write(res.data, version)
case res: SaslAuthenticateResponse => SaslAuthenticateResponseDataJsonConverter.write(res.data, version)
case res: SaslHandshakeResponse => SaslHandshakeResponseDataJsonConverter.write(res.data, version)
case res: ShareAcknowledgeResponse => ShareAcknowledgeResponseDataJsonConverter.write(res.data, version)
case res: ShareFetchResponse => ShareFetchResponseDataJsonConverter.write(res.data, version)
case res: ShareGroupDescribeResponse => ShareGroupDescribeResponseDataJsonConverter.write(res.data, version)
case res: ShareGroupHeartbeatResponse => ShareGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
case res: StopReplicaResponse => StopReplicaResponseDataJsonConverter.write(res.data, version)
case res: SyncGroupResponse => SyncGroupResponseDataJsonConverter.write(res.data, version)
case res: TxnOffsetCommitResponse => TxnOffsetCommitResponseDataJsonConverter.write(res.data, version)

View File

@ -95,7 +95,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTemplate("testApiVersionsRequestIncludesUnreleasedApisTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true"),
))
def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {

View File

@ -704,10 +704,10 @@ class RequestQuotaTest extends BaseRequestTest {
new ConsumerGroupDescribeRequest.Builder(new ConsumerGroupDescribeRequestData(), true)
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS =>
new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true)
new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData())
case ApiKeys.PUSH_TELEMETRY =>
new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true)
new PushTelemetryRequest.Builder(new PushTelemetryRequestData())
case ApiKeys.ASSIGN_REPLICAS_TO_DIRS =>
new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData())
@ -718,6 +718,18 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
new DescribeTopicPartitionsRequest.Builder(new DescribeTopicPartitionsRequestData())
case ApiKeys.SHARE_GROUP_HEARTBEAT =>
new ShareGroupHeartbeatRequest.Builder(new ShareGroupHeartbeatRequestData(), true)
case ApiKeys.SHARE_GROUP_DESCRIBE =>
new ShareGroupDescribeRequest.Builder(new ShareGroupDescribeRequestData(), true)
case ApiKeys.SHARE_FETCH =>
new ShareFetchRequest.Builder(new ShareFetchRequestData(), true)
case ApiKeys.SHARE_ACKNOWLEDGE =>
new ShareAcknowledgeRequest.Builder(new ShareAcknowledgeRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

View File

@ -2267,6 +2267,42 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminc
<td>Topic</td>
<td></td>
</tr>
<tr>
<td>SHARE_GROUP_HEARTBEAT (76)</td>
<td>Read</td>
<td>Group</td>
<td></td>
</tr>
<tr>
<td>SHARE_GROUP_DESCRIBE (77)</td>
<td>Describe</td>
<td>Group</td>
<td></td>
</tr>
<tr>
<td>SHARE_FETCH (78)</td>
<td>Read</td>
<td>Group</td>
<td></td>
</tr>
<tr>
<td>SHARE_FETCH (78)</td>
<td>Read</td>
<td>Topic</td>
<td></td>
</tr>
<tr>
<td>SHARE_ACKNOWLEDGE (79)</td>
<td>Read</td>
<td>Group</td>
<td></td>
</tr>
<tr>
<td>SHARE_ACKNOWLEDGE (79)</td>
<td>Read</td>
<td>Topic</td>
<td></td>
</tr>
</tbody>
</table>