KAFKA-14391; Add ConsumerGroupHeartbeat API (#12972)

This patch does a few things:
1) It introduces a new flag to the request spec: `latestVersionUnstable`. It signifies that the last version of the API is considered unstable (or still in development). As such, the last API version is not exposed by the server unless specified otherwise with the new internal `unstable.api.versions.enable`. This allows us to commit new APIs which are still in development.
3) It adds the ConsumerGroupHeartbeat API, part of KIP-848, and marks it as unreleased for now.
4) It adds the new error codes required by the new ConsumerGroupHeartbeat API.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2023-02-09 09:13:31 +01:00 committed by GitHub
parent 659dd2e49f
commit 3be7f7d611
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 778 additions and 79 deletions

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class FencedMemberEpochException extends ApiException {
public FencedMemberEpochException(String message) {
super(message);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class UnreleasedInstanceIdException extends ApiException {
public UnreleasedInstanceIdException(String message) {
super(message);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class UnsupportedAssignorException extends ApiException {
public UnsupportedAssignorException(String message) {
super(message);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.protocol; package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
@ -27,6 +28,7 @@ import java.util.EnumMap;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -108,7 +110,8 @@ public enum ApiKeys {
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true),
CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER = private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class); new EnumMap<>(ApiMessageType.ListenerType.class);
@ -193,7 +196,11 @@ public enum ApiKeys {
} }
public short latestVersion() { public short latestVersion() {
return messageType.highestSupportedVersion(); return messageType.highestSupportedVersion(true);
}
public short latestVersion(boolean enableUnstableLastVersion) {
return messageType.highestSupportedVersion(enableUnstableLastVersion);
} }
public short oldestVersion() { public short oldestVersion() {
@ -212,6 +219,30 @@ public enum ApiKeys {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion(); return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
} }
public boolean isVersionEnabled(short apiVersion, boolean enableUnstableLastVersion) {
// ApiVersions API is a particular case. The client always send the highest version
// that it supports and the server fails back to version 0 if it does not know it.
// Hence, we have to accept any versions here, even unsupported ones.
if (this == ApiKeys.API_VERSIONS) return true;
return apiVersion >= oldestVersion() && apiVersion <= latestVersion(enableUnstableLastVersion);
}
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion) {
short oldestVersion = oldestVersion();
short latestVersion = latestVersion(enableUnstableLastVersion);
// API is entirely disabled if latestStableVersion is smaller than oldestVersion.
if (latestVersion >= oldestVersion) {
return Optional.of(new ApiVersionsResponseData.ApiVersion()
.setApiKey(messageType.apiKey())
.setMinVersion(oldestVersion)
.setMaxVersion(latestVersion));
} else {
return Optional.empty();
}
}
public short requestHeaderVersion(short apiVersion) { public short requestHeaderVersion(short apiVersion) {
return messageType.requestHeaderVersion(apiVersion); return messageType.requestHeaderVersion(apiVersion);
} }
@ -288,5 +319,4 @@ public enum ApiKeys {
.collect(Collectors.toList()); .collect(Collectors.toList());
return EnumSet.copyOf(apis); return EnumSet.copyOf(apis);
} }
} }

View File

@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
import org.apache.kafka.common.errors.FeatureUpdateFailedException; import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException; import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException; import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException; import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException;
@ -122,7 +123,9 @@ import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException; import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException; import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
@ -372,7 +375,10 @@ public enum Errors {
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new), FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new), INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new), NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new),
OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new); OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new),
FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoin.", FencedMemberEpochException::new),
UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another member in the consumer group. That member must leave first.", UnreleasedInstanceIdException::new),
UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class); private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -303,6 +303,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return ListTransactionsRequest.parse(buffer, apiVersion); return ListTransactionsRequest.parse(buffer, apiVersion);
case ALLOCATE_PRODUCER_IDS: case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsRequest.parse(buffer, apiVersion); return AllocateProducerIdsRequest.parse(buffer, apiVersion);
case CONSUMER_GROUP_HEARTBEAT:
return ConsumerGroupHeartbeatRequest.parse(buffer, apiVersion);
default: default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey)); "code should be updated to do so.", apiKey));

View File

@ -247,6 +247,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return ListTransactionsResponse.parse(responseBuffer, version); return ListTransactionsResponse.parse(responseBuffer, version);
case ALLOCATE_PRODUCER_IDS: case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsResponse.parse(responseBuffer, version); return AllocateProducerIdsResponse.parse(responseBuffer, version);
case CONSUMER_GROUP_HEARTBEAT:
return ConsumerGroupHeartbeatResponse.parse(responseBuffer, version);
default: default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey)); "code should be updated to do so.", apiKey));

View File

@ -116,7 +116,23 @@ public class ApiVersionsResponse extends AbstractResponse {
int throttleTimeMs, int throttleTimeMs,
ApiMessageType.ListenerType listenerType ApiMessageType.ListenerType listenerType
) { ) {
return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType), Features.emptySupportedFeatures()); return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, true),
Features.emptySupportedFeatures()
);
}
public static ApiVersionsResponse defaultApiVersionsResponse(
int throttleTimeMs,
ApiMessageType.ListenerType listenerType,
boolean enableUnstableLastVersion
) {
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
Features.emptySupportedFeatures()
);
} }
public static ApiVersionsResponse createApiVersionsResponse( public static ApiVersionsResponse createApiVersionsResponse(
@ -146,14 +162,23 @@ public class ApiVersionsResponse extends AbstractResponse {
Map<String, Short> finalizedFeatures, Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch, long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions, NodeApiVersions controllerApiVersions,
ListenerType listenerType ListenerType listenerType,
boolean enableUnstableLastVersion
) { ) {
ApiVersionCollection apiKeys; ApiVersionCollection apiKeys;
if (controllerApiVersions != null) { if (controllerApiVersions != null) {
apiKeys = intersectForwardableApis( apiKeys = intersectForwardableApis(
listenerType, minRecordVersion, controllerApiVersions.allSupportedApiVersions()); listenerType,
minRecordVersion,
controllerApiVersions.allSupportedApiVersions(),
enableUnstableLastVersion
);
} else { } else {
apiKeys = filterApis(minRecordVersion, listenerType); apiKeys = filterApis(
minRecordVersion,
listenerType,
enableUnstableLastVersion
);
} }
return createApiVersionsResponse( return createApiVersionsResponse(
@ -187,20 +212,31 @@ public class ApiVersionsResponse extends AbstractResponse {
public static ApiVersionCollection filterApis( public static ApiVersionCollection filterApis(
RecordVersion minRecordVersion, RecordVersion minRecordVersion,
ApiMessageType.ListenerType listenerType ApiMessageType.ListenerType listenerType
) {
return filterApis(minRecordVersion, listenerType, false);
}
public static ApiVersionCollection filterApis(
RecordVersion minRecordVersion,
ApiMessageType.ListenerType listenerType,
boolean enableUnstableLastVersion
) { ) {
ApiVersionCollection apiKeys = new ApiVersionCollection(); ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) { for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) { if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey)); apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
} }
} }
return apiKeys; return apiKeys;
} }
public static ApiVersionCollection collectApis(Set<ApiKeys> apiKeys) { public static ApiVersionCollection collectApis(
Set<ApiKeys> apiKeys,
boolean enableUnstableLastVersion
) {
ApiVersionCollection res = new ApiVersionCollection(); ApiVersionCollection res = new ApiVersionCollection();
for (ApiKeys apiKey : apiKeys) { for (ApiKeys apiKey : apiKeys) {
res.add(ApiVersionsResponse.toApiVersion(apiKey)); apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(res::add);
} }
return res; return res;
} }
@ -212,24 +248,32 @@ public class ApiVersionsResponse extends AbstractResponse {
* @param listenerType the listener type which constrains the set of exposed APIs * @param listenerType the listener type which constrains the set of exposed APIs
* @param minRecordVersion min inter broker magic * @param minRecordVersion min inter broker magic
* @param activeControllerApiVersions controller ApiVersions * @param activeControllerApiVersions controller ApiVersions
* @param enableUnstableLastVersion whether unstable versions should be advertised or not
* @return commonly agreed ApiVersion collection * @return commonly agreed ApiVersion collection
*/ */
public static ApiVersionCollection intersectForwardableApis( public static ApiVersionCollection intersectForwardableApis(
final ApiMessageType.ListenerType listenerType, final ApiMessageType.ListenerType listenerType,
final RecordVersion minRecordVersion, final RecordVersion minRecordVersion,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions final Map<ApiKeys, ApiVersion> activeControllerApiVersions,
boolean enableUnstableLastVersion
) { ) {
ApiVersionCollection apiKeys = new ApiVersionCollection(); ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) { for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) { if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
ApiVersion brokerApiVersion = toApiVersion(apiKey); final Optional<ApiVersion> brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion);
if (!brokerApiVersion.isPresent()) {
// Broker does not support this API key.
continue;
}
final ApiVersion finalApiVersion; final ApiVersion finalApiVersion;
if (!apiKey.forwardable) { if (!apiKey.forwardable) {
finalApiVersion = brokerApiVersion; finalApiVersion = brokerApiVersion.get();
} else { } else {
Optional<ApiVersion> intersectVersion = intersect(brokerApiVersion, Optional<ApiVersion> intersectVersion = intersect(
activeControllerApiVersions.getOrDefault(apiKey, null)); brokerApiVersion.get(),
activeControllerApiVersions.getOrDefault(apiKey, null)
);
if (intersectVersion.isPresent()) { if (intersectVersion.isPresent()) {
finalApiVersion = intersectVersion.get(); finalApiVersion = intersectVersion.get();
} else { } else {

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
private final ConsumerGroupHeartbeatRequestData data;
public Builder(ConsumerGroupHeartbeatRequestData data) {
super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
this.data = data;
}
@Override
public ConsumerGroupHeartbeatRequest build(short version) {
return new ConsumerGroupHeartbeatRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ConsumerGroupHeartbeatRequestData data;
public ConsumerGroupHeartbeatRequest(ConsumerGroupHeartbeatRequestData data, short version) {
super(ApiKeys.CONSUMER_GROUP_HEARTBEAT, version);
this.data = data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new ConsumerGroupHeartbeatResponse(
new ConsumerGroupHeartbeatResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
);
}
@Override
public ConsumerGroupHeartbeatRequestData data() {
return data;
}
public static ConsumerGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
return new ConsumerGroupHeartbeatRequest(new ConsumerGroupHeartbeatRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#FENCED_MEMBER_EPOCH}
* - {@link Errors#UNSUPPORTED_ASSIGNOR}
* - {@link Errors#UNRELEASED_INSTANCE_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
*/
public class ConsumerGroupHeartbeatResponse extends AbstractResponse {
private final ConsumerGroupHeartbeatResponseData data;
public ConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData data) {
super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
this.data = data;
}
@Override
public ConsumerGroupHeartbeatResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ConsumerGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 68,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupHeartbeatRequest",
// The ConsumerGroupHeartbeat API is added as part of KIP-848 and is still
// under developement. Hence, the API is not exposed by default by brokers
// unless explicitely enabled.
"latestVersionUnstable": true,
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
{ "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
"about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName",
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
{ "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of the assignor." },
{ "name": "MinimumVersion", "type": "int16", "versions": "0+",
"about": "The minimum supported version for the metadata." },
{ "name": "MaximumVersion", "type": "int16", "versions": "0+",
"about": "The maximum supported version for the metadata." },
{ "name": "Reason", "type": "int8", "versions": "0+",
"about": "The reason of the metadata update." },
{ "name": "MetadataVersion", "type": "int16", "versions": "0+",
"about": "The version of the metadata." },
{ "name": "MetadataBytes", "type": "bytes", "versions": "0+",
"about": "The metadata." }
]},
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the partitions owned by the member.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
}

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 68,
"type": "response",
"name": "ConsumerGroupHeartbeatResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_MEMBER_ID (version 0+)
// - FENCED_MEMBER_EPOCH (version 0+)
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
"about": "True if the member should compute the assignment for the group." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "Error", "type": "int8", "versions": "0+",
"about": "The assigned error." },
{ "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member that can be used immediately." },
{ "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." },
{ "name": "MetadataVersion", "type": "int16", "versions": "0+",
"about": "The version of the metadata." },
{ "name": "MetadataBytes", "type": "bytes", "versions": "0+",
"about": "The assigned metadata." }
]}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
}

View File

@ -111,7 +111,7 @@ public class ApiMessageTypeTest {
for (Schema schema : type.responseSchemas()) for (Schema schema : type.responseSchemas())
assertNotNull(schema); assertNotNull(schema);
assertEquals(type.highestSupportedVersion() + 1, type.requestSchemas().length); assertEquals(type.highestSupportedVersion(true) + 1, type.requestSchemas().length);
} }
} }

View File

@ -35,6 +35,7 @@ public class ProtoUtilsTest {
case RENEW_DELEGATION_TOKEN: case RENEW_DELEGATION_TOKEN:
case ALTER_USER_SCRAM_CREDENTIALS: case ALTER_USER_SCRAM_CREDENTIALS:
case ENVELOPE: case ENVELOPE:
case CONSUMER_GROUP_HEARTBEAT:
assertTrue(key.requiresDelayedAllocation, key + " should require delayed allocation"); assertTrue(key.requiresDelayedAllocation, key + " should require delayed allocation");
break; break;
default: default:

View File

@ -101,7 +101,8 @@ public class ApiVersionsResponseTest {
ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis( ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis(
ApiMessageType.ListenerType.ZK_BROKER, ApiMessageType.ListenerType.ZK_BROKER,
RecordVersion.current(), RecordVersion.current(),
activeControllerApiVersions activeControllerApiVersions,
true
); );
verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse); verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse);
@ -119,7 +120,8 @@ public class ApiVersionsResponseTest {
Collections.emptyMap(), Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null, null,
ListenerType.ZK_BROKER ListenerType.ZK_BROKER,
true
); );
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs()); assertEquals(10, response.throttleTimeMs());
@ -138,7 +140,8 @@ public class ApiVersionsResponseTest {
Utils.mkMap(Utils.mkEntry("feature", (short) 3)), Utils.mkMap(Utils.mkEntry("feature", (short) 3)),
10L, 10L,
null, null,
ListenerType.ZK_BROKER ListenerType.ZK_BROKER,
true
); );
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
@ -165,7 +168,8 @@ public class ApiVersionsResponseTest {
Collections.emptyMap(), Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null, null,
ListenerType.ZK_BROKER ListenerType.ZK_BROKER,
true
); );
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response)); assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
@ -183,7 +187,8 @@ public class ApiVersionsResponseTest {
Collections.emptyMap(), Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null, null,
ListenerType.ZK_BROKER ListenerType.ZK_BROKER,
true
); );
// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them

View File

@ -62,6 +62,8 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerHeartbeatResponseData; import org.apache.kafka.common.message.BrokerHeartbeatResponseData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.BrokerRegistrationResponseData; import org.apache.kafka.common.message.BrokerRegistrationResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ControlledShutdownRequestData; import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData; import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition; import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
@ -1047,6 +1049,7 @@ public class RequestResponseTest {
case DESCRIBE_TRANSACTIONS: return createDescribeTransactionsRequest(version); case DESCRIBE_TRANSACTIONS: return createDescribeTransactionsRequest(version);
case LIST_TRANSACTIONS: return createListTransactionsRequest(version); case LIST_TRANSACTIONS: return createListTransactionsRequest(version);
case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsRequest(version); case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsRequest(version);
case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey); default: throw new IllegalArgumentException("Unknown API key " + apikey);
} }
} }
@ -1121,10 +1124,51 @@ public class RequestResponseTest {
case DESCRIBE_TRANSACTIONS: return createDescribeTransactionsResponse(); case DESCRIBE_TRANSACTIONS: return createDescribeTransactionsResponse();
case LIST_TRANSACTIONS: return createListTransactionsResponse(); case LIST_TRANSACTIONS: return createListTransactionsResponse();
case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsResponse(); case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsResponse();
case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey); default: throw new IllegalArgumentException("Unknown API key " + apikey);
} }
} }
private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) {
ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
.setGroupId("group")
.setMemberId("memberid")
.setMemberEpoch(10)
.setRebalanceTimeoutMs(60000)
.setServerAssignor("range")
.setRackId("rackid")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(0, 1, 2)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(3, 4, 5))
));
return new ConsumerGroupHeartbeatRequest.Builder(data).build(version);
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse() {
ConsumerGroupHeartbeatResponseData data = new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(1000)
.setMemberId("memberid")
.setMemberEpoch(11)
.setShouldComputeAssignment(false)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(0, 1, 2)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(3, 4, 5))
))
);
return new ConsumerGroupHeartbeatResponse(data);
}
private FetchSnapshotRequest createFetchSnapshotRequest(short version) { private FetchSnapshotRequest createFetchSnapshotRequest(short version) {
FetchSnapshotRequestData data = new FetchSnapshotRequestData() FetchSnapshotRequestData data = new FetchSnapshotRequestData()
.setClusterId("clusterId") .setClusterId("clusterId")

View File

@ -95,6 +95,7 @@ object RequestConvertToJson {
case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version) case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version) case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version) case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version)
case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
"code should be updated to do so."); "code should be updated to do so.");
} }
@ -170,6 +171,7 @@ object RequestConvertToJson {
case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version) case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version)
case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version) case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version) case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version)
case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so."); "code should be updated to do so.");
} }

View File

@ -1108,10 +1108,10 @@ private[kafka] class Processor(
protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = { protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer) val header = RequestHeader.parse(buffer)
if (apiVersionManager.isApiEnabled(header.apiKey)) { if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
header header
} else { } else {
throw new InvalidRequestException(s"Received request api key ${header.apiKey} which is not enabled") throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
} }
} }

View File

@ -26,10 +26,13 @@ import org.apache.kafka.common.requests.ApiVersionsResponse
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
trait ApiVersionManager { trait ApiVersionManager {
def enableUnstableLastVersion: Boolean
def listenerType: ListenerType def listenerType: ListenerType
def enabledApis: collection.Set[ApiKeys] def enabledApis: collection.Set[ApiKeys]
def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey) def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
}
def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis) def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
} }
@ -45,7 +48,8 @@ object ApiVersionManager {
listenerType, listenerType,
forwardingManager, forwardingManager,
supportedFeatures, supportedFeatures,
metadataCache metadataCache,
config.unstableApiVersionsEnabled
) )
} }
} }
@ -53,14 +57,23 @@ object ApiVersionManager {
class SimpleApiVersionManager( class SimpleApiVersionManager(
val listenerType: ListenerType, val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys], val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange] brokerFeatures: Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean
) extends ApiVersionManager { ) extends ApiVersionManager {
def this(listenerType: ListenerType) = { def this(
this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures()) listenerType: ListenerType,
enableUnstableLastVersion: Boolean
) = {
this(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
enableUnstableLastVersion
)
} }
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava) private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures) ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
@ -71,9 +84,12 @@ class DefaultApiVersionManager(
val listenerType: ListenerType, val listenerType: ListenerType,
forwardingManager: Option[ForwardingManager], forwardingManager: Option[ForwardingManager],
features: BrokerFeatures, features: BrokerFeatures,
metadataCache: MetadataCache metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean
) extends ApiVersionManager { ) extends ApiVersionManager {
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
val supportedFeatures = features.supportedFeatures val supportedFeatures = features.supportedFeatures
val finalizedFeatures = metadataCache.features() val finalizedFeatures = metadataCache.features()
@ -86,14 +102,8 @@ class DefaultApiVersionManager(
finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
finalizedFeatures.epoch, finalizedFeatures.epoch,
controllerApiVersions.orNull, controllerApiVersions.orNull,
listenerType) listenerType,
} enableUnstableLastVersion
)
override def enabledApis: collection.Set[ApiKeys] = {
ApiKeys.apisForListener(listenerType).asScala
}
override def isApiEnabled(apiKey: ApiKeys): Boolean = {
apiKey.inScope(listenerType)
} }
} }

View File

@ -170,7 +170,10 @@ class ControllerServer(
}.toMap }.toMap
} }
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER) val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled
)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

View File

@ -168,10 +168,10 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
if (!apiVersionManager.isApiEnabled(request.header.apiKey)) { if (!apiVersionManager.isApiEnabled(request.header.apiKey, request.header.apiVersion)) {
// The socket server will reject APIs which are not exposed in this scope and close the connection // The socket server will reject APIs which are not exposed in this scope and close the connection
// before handing them to the request handler, so this path should not be exercised in practice // before handing them to the request handler, so this path should not be exercised in practice
throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled") throw new IllegalStateException(s"API ${request.header.apiKey} with version ${request.header.apiVersion} is not enabled")
} }
request.header.apiKey match { request.header.apiKey match {
@ -237,6 +237,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
} }
} catch { } catch {
@ -3568,6 +3569,13 @@ class KafkaApis(val requestChannel: RequestChannel,
) )
} }
def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest]
// KIP-848 is not implemented yet so return UNSUPPORTED_VERSION.
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
private def updateRecordConversionStats(request: RequestChannel.Request, private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition, tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = { conversionStats: RecordConversionStats): Unit = {

View File

@ -621,6 +621,9 @@ object KafkaConfig {
val PasswordEncoderKeyLengthProp = "password.encoder.key.length" val PasswordEncoderKeyLengthProp = "password.encoder.key.length"
val PasswordEncoderIterationsProp = "password.encoder.iterations" val PasswordEncoderIterationsProp = "password.encoder.iterations"
/** Internal Configurations **/
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
/* Documentation */ /* Documentation */
/** ********* Zookeeper Configuration ***********/ /** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " + val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
@ -1404,6 +1407,10 @@ object KafkaConfig {
.define(RaftConfig.QUORUM_LINGER_MS_CONFIG, INT, Defaults.QuorumLingerMs, null, MEDIUM, RaftConfig.QUORUM_LINGER_MS_DOC) .define(RaftConfig.QUORUM_LINGER_MS_CONFIG, INT, Defaults.QuorumLingerMs, null, MEDIUM, RaftConfig.QUORUM_LINGER_MS_DOC)
.define(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, Defaults.QuorumRequestTimeoutMs, null, MEDIUM, RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_DOC) .define(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, Defaults.QuorumRequestTimeoutMs, null, MEDIUM, RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC) .define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC)
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this broker.
.defineInternal(UnstableApiVersionsEnableProp, BOOLEAN, false, LOW)
} }
/** ********* Remote Log Management Configuration *********/ /** ********* Remote Log Management Configuration *********/
@ -1929,6 +1936,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val quorumRequestTimeoutMs = getInt(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG) val quorumRequestTimeoutMs = getInt(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
val quorumRetryBackoffMs = getInt(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG) val quorumRetryBackoffMs = getInt(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
/** Internal Configurations **/
val unstableApiVersionsEnabled = getBoolean(KafkaConfig.UnstableApiVersionsEnableProp)
def addReconfigurable(reconfigurable: Reconfigurable): Unit = { def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable) dynamicConfig.addReconfigurable(reconfigurable)
} }

View File

@ -74,7 +74,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER) val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
val metaProperties = MetaProperties( val metaProperties = MetaProperties(

View File

@ -41,6 +41,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
props props
}).map(KafkaConfig.fromProps) }).map(KafkaConfig.fromProps)

View File

@ -77,7 +77,7 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests // Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics() TestUtils.clearYammerMetrics()
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER) private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES) server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket] val sockets = new ArrayBuffer[Socket]

View File

@ -21,7 +21,7 @@ import kafka.test.ClusterInstance
import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
import org.apache.kafka.common.message.{ApiMessageType, ApiVersionsResponseData} import org.apache.kafka.common.message.ApiMessageType
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.RecordVersion import org.apache.kafka.common.record.RecordVersion
@ -68,31 +68,42 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
} finally socket.close() } finally socket.close()
} }
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = cluster.clientListener()): Unit = { def validateApiVersionsResponse(
apiVersionsResponse: ApiVersionsResponse,
listenerName: ListenerName = cluster.clientListener(),
enableUnstableLastVersion: Boolean = false
): Unit = {
val expectedApis = if (!cluster.isKRaftTest) { val expectedApis = if (!cluster.isKRaftTest) {
ApiKeys.zkBrokerApis() ApiVersionsResponse.collectApis(
ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER),
enableUnstableLastVersion
)
} else if (cluster.controllerListenerName().asScala.contains(listenerName)) { } else if (cluster.controllerListenerName().asScala.contains(listenerName)) {
ApiKeys.controllerApis() ApiVersionsResponse.collectApis(
ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
enableUnstableLastVersion
)
} else { } else {
ApiVersionsResponse.intersectForwardableApis( ApiVersionsResponse.intersectForwardableApis(
ApiMessageType.ListenerType.BROKER, ApiMessageType.ListenerType.BROKER,
RecordVersion.current, RecordVersion.current,
NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions() NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions(),
enableUnstableLastVersion
) )
} }
assertEquals(expectedApis.size(), apiVersionsResponse.data.apiKeys().size(), assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size,
"API keys in ApiVersionsResponse must match API keys supported by broker.") "API keys in ApiVersionsResponse must match API keys supported by broker.")
val defaultApiVersionsResponse = if (!cluster.isKRaftTest) { val defaultApiVersionsResponse = if (!cluster.isKRaftTest) {
ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.ZK_BROKER) ApiVersionsResponse.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, enableUnstableLastVersion)
} else if(cluster.controllerListenerName().asScala.contains(listenerName)) { } else if(cluster.controllerListenerName().asScala.contains(listenerName)) {
ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.CONTROLLER) ApiVersionsResponse.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion)
} else { } else {
ApiVersionsResponse.createApiVersionsResponse(0, expectedApis.asInstanceOf[ApiVersionsResponseData.ApiVersionCollection]) ApiVersionsResponse.createApiVersionsResponse(0, expectedApis)
} }
for (expectedApiVersion: ApiVersion <- defaultApiVersionsResponse.data.apiKeys().asScala) { for (expectedApiVersion: ApiVersion <- defaultApiVersionsResponse.data.apiKeys.asScala) {
val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey) val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
assertNotNull(actualApiVersion, s"API key ${expectedApiVersion.apiKey()} is supported by broker, but not received in ApiVersionsResponse.") assertNotNull(actualApiVersion, s"API key ${expectedApiVersion.apiKey()} is supported by broker, but not received in ApiVersionsResponse.")
assertEquals(expectedApiVersion.apiKey, actualApiVersion.apiKey, "API key must be supported by the broker.") assertEquals(expectedApiVersion.apiKey, actualApiVersion.apiKey, "API key must be supported by the broker.")

View File

@ -40,10 +40,34 @@ class ApiVersionManagerTest {
listenerType = apiScope, listenerType = apiScope,
forwardingManager = None, forwardingManager = None,
features = brokerFeatures, features = brokerFeatures,
metadataCache = metadataCache metadataCache = metadataCache,
enableUnstableLastVersion = true
) )
assertEquals(ApiKeys.apisForListener(apiScope).asScala, versionManager.enabledApis) assertEquals(ApiKeys.apisForListener(apiScope).asScala, versionManager.enabledApis)
assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall(versionManager.isApiEnabled)) assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall { apiKey =>
apiKey.allVersions.asScala.forall { version =>
versionManager.isApiEnabled(apiKey, version)
}
})
}
@ParameterizedTest
@EnumSource(classOf[ListenerType])
def testDisabledApis(apiScope: ListenerType): Unit = {
val versionManager = new DefaultApiVersionManager(
listenerType = apiScope,
forwardingManager = None,
features = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = false
)
ApiKeys.apisForListener(apiScope).forEach { apiKey =>
if (apiKey.messageType.latestVersionUnstable()) {
assertFalse(versionManager.isApiEnabled(apiKey, apiKey.latestVersion),
s"$apiKey version ${apiKey.latestVersion} should be disabled.")
}
}
} }
@Test @Test
@ -63,7 +87,8 @@ class ApiVersionManagerTest {
listenerType = ListenerType.ZK_BROKER, listenerType = ListenerType.ZK_BROKER,
forwardingManager = Some(forwardingManager), forwardingManager = Some(forwardingManager),
features = brokerFeatures, features = brokerFeatures,
metadataCache = metadataCache metadataCache = metadataCache,
enableUnstableLastVersion = true
) )
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
@ -83,9 +108,10 @@ class ApiVersionManagerTest {
listenerType = ListenerType.BROKER, listenerType = ListenerType.BROKER,
forwardingManager = forwardingManagerOpt, forwardingManager = forwardingManagerOpt,
features = brokerFeatures, features = brokerFeatures,
metadataCache = metadataCache metadataCache = metadataCache,
enableUnstableLastVersion = true
) )
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE)) assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
@ -104,9 +130,10 @@ class ApiVersionManagerTest {
listenerType = ListenerType.ZK_BROKER, listenerType = ListenerType.ZK_BROKER,
forwardingManager = Some(forwardingManager), forwardingManager = Some(forwardingManager),
features = brokerFeatures, features = brokerFeatures,
metadataCache = metadataCache metadataCache = metadataCache,
enableUnstableLastVersion = true
) )
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE)) assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
@ -122,13 +149,13 @@ class ApiVersionManagerTest {
listenerType = ListenerType.ZK_BROKER, listenerType = ListenerType.ZK_BROKER,
forwardingManager = None, forwardingManager = None,
features = brokerFeatures, features = brokerFeatures,
metadataCache = metadataCache metadataCache = metadataCache,
enableUnstableLastVersion = true
) )
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE)) assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)) assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
} }
} }

View File

@ -21,7 +21,7 @@ import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest import org.apache.kafka.common.requests.ApiVersionsRequest
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.ClusterTestExtensions
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
@ -44,6 +44,13 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse) validateApiVersionsResponse(apiVersionsResponse)
} }
@ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")))
def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
}
@ClusterTest(clusterType = Type.ZK) @ClusterTest(clusterType = Type.ZK)
def testApiVersionsRequestThroughControlPlaneListener(): Unit = { def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build() val request = new ApiVersionsRequest.Builder().build()

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.io.EOFException
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
@Tag("integration")
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatIsDisabledByDefault(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
).build()
assertThrows(classOf[EOFException], () => connectAndReceive(consumerGroupHeartbeatRequest))
}
@ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")))
def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
).build()
val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
request,
cluster.anyBrokerSocketServer(),
cluster.clientListener()
)
}
}

View File

@ -109,7 +109,7 @@ class ControllerApisTest {
new KafkaConfig(props), new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId), MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty, Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER) new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
) )
} }

View File

@ -182,7 +182,7 @@ class KafkaApisTest {
} else { } else {
ApiKeys.apisForListener(listenerType).asScala.toSet ApiKeys.apisForListener(listenerType).asScala.toSet
} }
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures()) val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
new KafkaApis( new KafkaApis(
requestChannel = requestChannel, requestChannel = requestChannel,
@ -5743,4 +5743,20 @@ class KafkaApisTest {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId) metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest) verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
} }
@Test
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val requestChannelRequest = buildRequest(
new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData()
.setGroupId("group")
).build()
)
createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
assertEquals(expectedHeartbeatResponse, response.data)
}
} }

View File

@ -81,6 +81,7 @@ class RequestQuotaTest extends BaseRequestTest {
properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName) properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName)
properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName) properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
} }
@BeforeEach @BeforeEach
@ -643,6 +644,10 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.ALLOCATE_PRODUCER_IDS => case ApiKeys.ALLOCATE_PRODUCER_IDS =>
new AllocateProducerIdsRequest.Builder(new AllocateProducerIdsRequestData()) new AllocateProducerIdsRequest.Builder(new AllocateProducerIdsRequestData())
case ApiKeys.CONSUMER_GROUP_HEARTBEAT =>
new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData())
case _ => case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey) throw new IllegalArgumentException("Unsupported API key " + apiKey)
} }

View File

@ -154,10 +154,12 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("%n"); buffer.printf("%n");
generateAccessor("lowestSupportedVersion", "short"); generateAccessor("lowestSupportedVersion", "short");
buffer.printf("%n"); buffer.printf("%n");
generateAccessor("highestSupportedVersion", "short"); generateHighestSupportedVersion();
buffer.printf("%n"); buffer.printf("%n");
generateAccessor("listeners", "EnumSet<ListenerType>"); generateAccessor("listeners", "EnumSet<ListenerType>");
buffer.printf("%n"); buffer.printf("%n");
generateAccessor("latestVersionUnstable", "boolean");
buffer.printf("%n");
generateAccessor("apiKey", "short"); generateAccessor("apiKey", "short");
buffer.printf("%n"); buffer.printf("%n");
generateAccessor("requestSchemas", "Schema[]"); generateAccessor("requestSchemas", "Schema[]");
@ -210,7 +212,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, %s)%s%n", buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, %s, %s)%s%n",
MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT), MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT),
MessageGenerator.capitalizeFirst(name), MessageGenerator.capitalizeFirst(name),
entry.getKey(), entry.getKey(),
@ -219,6 +221,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
apiData.requestSpec.struct().versions().lowest(), apiData.requestSpec.struct().versions().lowest(),
apiData.requestSpec.struct().versions().highest(), apiData.requestSpec.struct().versions().highest(),
generateListenerTypeEnumSet(listeners), generateListenerTypeEnumSet(listeners),
apiData.requestSpec.latestVersionUnstable(),
(numProcessed == apis.size()) ? ";" : ","); (numProcessed == apis.size()) ? ";" : ",");
} }
} }
@ -231,6 +234,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("private final short lowestSupportedVersion;%n"); buffer.printf("private final short lowestSupportedVersion;%n");
buffer.printf("private final short highestSupportedVersion;%n"); buffer.printf("private final short highestSupportedVersion;%n");
buffer.printf("private final EnumSet<ListenerType> listeners;%n"); buffer.printf("private final EnumSet<ListenerType> listeners;%n");
buffer.printf("private final boolean latestVersionUnstable;%n");
headerGenerator.addImport(MessageGenerator.SCHEMA_CLASS); headerGenerator.addImport(MessageGenerator.SCHEMA_CLASS);
headerGenerator.addImport(MessageGenerator.ENUM_SET_CLASS); headerGenerator.addImport(MessageGenerator.ENUM_SET_CLASS);
} }
@ -239,7 +243,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("ApiMessageType(String name, short apiKey, " + buffer.printf("ApiMessageType(String name, short apiKey, " +
"Schema[] requestSchemas, Schema[] responseSchemas, " + "Schema[] requestSchemas, Schema[] responseSchemas, " +
"short lowestSupportedVersion, short highestSupportedVersion, " + "short lowestSupportedVersion, short highestSupportedVersion, " +
"EnumSet<ListenerType> listeners) {%n"); "EnumSet<ListenerType> listeners, boolean latestVersionUnstable) {%n");
buffer.incrementIndent(); buffer.incrementIndent();
buffer.printf("this.name = name;%n"); buffer.printf("this.name = name;%n");
buffer.printf("this.apiKey = apiKey;%n"); buffer.printf("this.apiKey = apiKey;%n");
@ -248,6 +252,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n"); buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n");
buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n"); buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n");
buffer.printf("this.listeners = listeners;%n"); buffer.printf("this.listeners = listeners;%n");
buffer.printf("this.latestVersionUnstable = latestVersionUnstable;%n");
buffer.decrementIndent(); buffer.decrementIndent();
buffer.printf("}%n"); buffer.printf("}%n");
} }
@ -403,6 +408,23 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("}%n"); buffer.printf("}%n");
} }
private void generateHighestSupportedVersion() {
buffer.printf("public short highestSupportedVersion(boolean enableUnstableLastVersion) {%n");
buffer.incrementIndent();
buffer.printf("if (!this.latestVersionUnstable || enableUnstableLastVersion) {%n");
buffer.incrementIndent();
buffer.printf("return this.highestSupportedVersion;%n");
buffer.decrementIndent();
buffer.printf("} else {%n");
buffer.incrementIndent();
buffer.printf("// A negative value means that the API has no enabled versions.%n");
buffer.printf("return (short) (this.highestSupportedVersion - 1);%n");
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void write(BufferedWriter writer) throws IOException { private void write(BufferedWriter writer) throws IOException {
headerGenerator.buffer().write(writer); headerGenerator.buffer().write(writer);
buffer.write(writer); buffer.write(writer);

View File

@ -39,6 +39,8 @@ public final class MessageSpec {
private final List<RequestListenerType> listeners; private final List<RequestListenerType> listeners;
private final boolean latestVersionUnstable;
@JsonCreator @JsonCreator
public MessageSpec(@JsonProperty("name") String name, public MessageSpec(@JsonProperty("name") String name,
@JsonProperty("validVersions") String validVersions, @JsonProperty("validVersions") String validVersions,
@ -47,7 +49,9 @@ public final class MessageSpec {
@JsonProperty("type") MessageSpecType type, @JsonProperty("type") MessageSpecType type,
@JsonProperty("commonStructs") List<StructSpec> commonStructs, @JsonProperty("commonStructs") List<StructSpec> commonStructs,
@JsonProperty("flexibleVersions") String flexibleVersions, @JsonProperty("flexibleVersions") String flexibleVersions,
@JsonProperty("listeners") List<RequestListenerType> listeners) { @JsonProperty("listeners") List<RequestListenerType> listeners,
@JsonProperty("latestVersionUnstable") boolean latestVersionUnstable
) {
this.struct = new StructSpec(name, validVersions, fields); this.struct = new StructSpec(name, validVersions, fields);
this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey); this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
this.type = Objects.requireNonNull(type); this.type = Objects.requireNonNull(type);
@ -70,6 +74,12 @@ public final class MessageSpec {
"messages with type `request`"); "messages with type `request`");
} }
this.listeners = listeners; this.listeners = listeners;
if (latestVersionUnstable && type != MessageSpecType.REQUEST) {
throw new RuntimeException("The `latestVersionUnstable` property is only valid for " +
"messages with type `request`");
}
this.latestVersionUnstable = latestVersionUnstable;
} }
public StructSpec struct() { public StructSpec struct() {
@ -124,6 +134,11 @@ public final class MessageSpec {
return listeners; return listeners;
} }
@JsonProperty("latestVersionUnstable")
public boolean latestVersionUnstable() {
return latestVersionUnstable;
}
public String dataClassName() { public String dataClassName() {
switch (type) { switch (type) {
case HEADER: case HEADER:

View File

@ -199,7 +199,7 @@ public class MetadataRequestBenchmark {
setClusterId("clusterId"). setClusterId("clusterId").
setTime(Time.SYSTEM). setTime(Time.SYSTEM).
setTokenManager(null). setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER)). setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)).
build(); build();
} }