KAFKA-12278; Ensure exposed api versions are consistent within listener (#10666)

Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true.  We now have more complex requirements for
API accessibility.

For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc.  Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.

All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above.  This PR introduces a
new "listeners" field to the request schema definitions.  This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller".  ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).

This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type.  Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.

Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.

One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners.  This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2021-02-04 10:04:17 -08:00 committed by Colin P. Mccabe
parent b50a78b4ac
commit 698319b8e2
113 changed files with 1090 additions and 584 deletions

View File

@ -50,6 +50,7 @@
<subpackage name="common">
<allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
<allow class="org.apache.kafka.common.message.ApiMessageType" exact-match="true" />
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.annotation" />
@ -108,6 +109,7 @@
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.security" />
<allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />
</subpackage>
<subpackage name="resource">

View File

@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@ -27,7 +24,10 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -62,7 +62,7 @@ public class NodeApiVersions {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
@ -170,7 +170,7 @@ public class NodeApiVersions {
// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
@ -40,6 +41,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;
public class ChannelBuilders {
private static final Logger log = LoggerFactory.getLogger(ChannelBuilders.class);
@ -77,7 +79,7 @@ public class ChannelBuilders {
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
saslHandshakeRequestEnable, null, null, time, logContext);
saslHandshakeRequestEnable, null, null, time, logContext, null);
}
/**
@ -89,6 +91,7 @@ public class ChannelBuilders {
* @param tokenCache Delegation token cache
* @param time the time instance
* @param logContext the log context instance
* @param apiVersionSupplier supplier for ApiVersions responses sent prior to authentication
*
* @return the configured `ChannelBuilder`
*/
@ -99,10 +102,11 @@ public class ChannelBuilders {
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext) {
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
isInterBrokerListener, null, true, credentialCache,
tokenCache, time, logContext);
tokenCache, time, logContext, apiVersionSupplier);
}
private static ChannelBuilder create(SecurityProtocol securityProtocol,
@ -116,7 +120,8 @@ public class ChannelBuilders {
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext) {
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
ChannelBuilder channelBuilder;
@ -174,7 +179,8 @@ public class ChannelBuilders {
tokenCache,
sslClientAuthOverride,
time,
logContext);
logContext,
apiVersionSupplier);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
@ -85,6 +86,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final DelegationTokenCache tokenCache;
private final Map<String, LoginManager> loginManagers;
private final Map<String, Subject> subjects;
private final Supplier<ApiVersionsResponse> apiVersionSupplier;
private SslFactory sslFactory;
private Map<String, ?> configs;
@ -108,7 +110,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
DelegationTokenCache tokenCache,
String sslClientAuthOverride,
Time time,
LogContext logContext) {
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
this.mode = mode;
this.jaasContexts = jaasContexts;
this.loginManagers = new HashMap<>(jaasContexts.size());
@ -126,6 +129,11 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
this.time = time;
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.apiVersionSupplier = apiVersionSupplier;
if (mode == Mode.SERVER && apiVersionSupplier == null) {
throw new IllegalArgumentException("Server channel builder must provide an ApiVersionResponse supplier");
}
}
@SuppressWarnings("unchecked")
@ -266,7 +274,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
ChannelMetadataRegistry metadataRegistry) {
return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects,
kerberosShortNamer, listenerName, securityProtocol, transportLayer,
connectionsMaxReauthMsByMechanism, metadataRegistry, time);
connectionsMaxReauthMsByMechanism, metadataRegistry, time, apiVersionSupplier);
}
// Visible to override for testing

View File

@ -21,7 +21,10 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@ -90,19 +93,28 @@ public enum ApiKeys {
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
static {
for (ApiMessageType.ListenerType listenerType : ApiMessageType.ListenerType.values()) {
APIS_BY_LISTENER.put(listenerType, filterApisForListener(listenerType));
}
}
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
@ -120,9 +132,6 @@ public enum ApiKeys {
/** indicates the minimum required inter broker magic required to support the API */
public final byte minRequiredInterBrokerMagic;
/** indicates whether this is an API which is only exposed by the KIP-500 controller **/
public final boolean isControllerOnlyApi;
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
@ -142,24 +151,17 @@ public enum ApiKeys {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, forwardable);
}
ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false);
}
ApiKeys(
ApiMessageType messageType,
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean forwardable,
boolean isControllerOnlyApi
boolean forwardable
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
this.isControllerOnlyApi = isControllerOnlyApi;
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
}
@ -195,6 +197,14 @@ public enum ApiKeys {
return messageType.lowestSupportedVersion();
}
public List<Short> allVersions() {
List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
for (short version = oldestVersion(); version < latestVersion(); version++) {
versions.add(version);
}
return versions;
}
public boolean isVersionSupported(short apiVersion) {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
}
@ -207,6 +217,10 @@ public enum ApiKeys {
return messageType.responseHeaderVersion(apiVersion);
}
public boolean inScope(ApiMessageType.ListenerType listener) {
return messageType.listeners().contains(listener);
}
private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
@ -214,7 +228,7 @@ public enum ApiKeys {
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : ApiKeys.brokerApis()) {
for (ApiKeys key : zkBrokerApis()) {
b.append("<tr>\n");
b.append("<td>");
b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name + "</a>");
@ -246,10 +260,19 @@ public enum ApiKeys {
return hasBuffer.get();
}
public static List<ApiKeys> brokerApis() {
return Arrays.stream(values())
.filter(api -> !api.isControllerOnlyApi)
public static EnumSet<ApiKeys> zkBrokerApis() {
return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
}
public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {
return APIS_BY_LISTENER.get(listener);
}
private static EnumSet<ApiKeys> filterApisForListener(ApiMessageType.ListenerType listener) {
List<ApiKeys> controllerApis = Arrays.stream(ApiKeys.values())
.filter(apiKey -> apiKey.messageType.listeners().contains(listener))
.collect(Collectors.toList());
return EnumSet.copyOf(controllerApis);
}
}

View File

@ -133,7 +133,7 @@ public class Protocol {
b.append("</pre>\n");
schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
}
for (ApiKeys key : ApiKeys.brokerApis()) {
for (ApiKeys key : ApiKeys.zkBrokerApis()) {
// Key
b.append("<h5>");
b.append("<a name=\"The_Messages_" + key.name + "\">");

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@ -29,11 +30,12 @@ import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureK
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordVersion;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Possible error codes:
@ -44,9 +46,6 @@ public class ApiVersionsResponse extends AbstractResponse {
public static final long UNKNOWN_FINALIZED_FEATURES_EPOCH = -1L;
public static final ApiVersionsResponse DEFAULT_API_VERSIONS_RESPONSE = createApiVersionsResponse(
DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
private final ApiVersionsResponseData data;
public ApiVersionsResponse(ApiVersionsResponseData data) {
@ -96,49 +95,89 @@ public class ApiVersionsResponse extends AbstractResponse {
}
}
public static ApiVersionsResponse createApiVersionsResponse(final int throttleTimeMs, final byte minMagic) {
return createApiVersionsResponse(throttleTimeMs, minMagic, Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(), UNKNOWN_FINALIZED_FEATURES_EPOCH);
public static ApiVersionsResponse defaultApiVersionsResponse(
ApiMessageType.ListenerType listenerType
) {
return defaultApiVersionsResponse(0, listenerType);
}
private static ApiVersionsResponse createApiVersionsResponse(
final int throttleTimeMs,
final byte minMagic,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Features<FinalizedVersionRange> finalizedFeatures,
final long finalizedFeaturesEpoch) {
public static ApiVersionsResponse defaultApiVersionsResponse(
int throttleTimeMs,
ApiMessageType.ListenerType listenerType
) {
return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType));
}
public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
UNKNOWN_FINALIZED_FEATURES_EPOCH
);
}
public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
Features<FinalizedVersionRange> finalizedFeatures,
long finalizedFeaturesEpoch
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
defaultApiKeys(minMagic),
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch));
finalizedFeaturesEpoch
)
);
}
public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
public static ApiVersionCollection filterApis(
RecordVersion minRecordVersion,
ApiMessageType.ListenerType listenerType
) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
}
}
return apiKeys;
}
public static ApiVersionCollection collectApis(Set<ApiKeys> apiKeys) {
ApiVersionCollection res = new ApiVersionCollection();
for (ApiKeys apiKey : apiKeys) {
res.add(ApiVersionsResponse.toApiVersion(apiKey));
}
return res;
}
/**
* Find the commonly agreed ApiVersions between local software and the controller.
* Find the common range of supported API versions between the locally
* known range and that of another set.
*
* @param minMagic min inter broker magic
* @param listenerType the listener type which constrains the set of exposed APIs
* @param minRecordVersion min inter broker magic
* @param activeControllerApiVersions controller ApiVersions
* @return commonly agreed ApiVersion collection
*/
public static ApiVersionCollection intersectControllerApiVersions(final byte minMagic,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
public static ApiVersionCollection intersectForwardableApis(
final ApiMessageType.ListenerType listenerType,
final RecordVersion minRecordVersion,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions
) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
ApiVersion brokerApiVersion = toApiVersion(apiKey);
final ApiVersion finalApiVersion;
@ -161,7 +200,7 @@ public class ApiVersionsResponse extends AbstractResponse {
return apiKeys;
}
public static ApiVersionsResponseData createApiVersionsResponseData(
private static ApiVersionsResponseData createApiVersionsResponseData(
final int throttleTimeMs,
final Errors error,
final ApiVersionCollection apiKeys,

View File

@ -85,6 +85,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
public class SaslServerAuthenticator implements Authenticator {
// GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
@ -127,6 +128,7 @@ public class SaslServerAuthenticator implements Authenticator {
private final Time time;
private final ReauthInfo reauthInfo;
private final ChannelMetadataRegistry metadataRegistry;
private final Supplier<ApiVersionsResponse> apiVersionSupplier;
// Current SASL state
private SaslState saslState = SaslState.INITIAL_REQUEST;
@ -154,7 +156,8 @@ public class SaslServerAuthenticator implements Authenticator {
TransportLayer transportLayer,
Map<String, Long> connectionsMaxReauthMsByMechanism,
ChannelMetadataRegistry metadataRegistry,
Time time) {
Time time,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
this.callbackHandlers = callbackHandlers;
this.connectionId = connectionId;
this.subjects = subjects;
@ -166,6 +169,7 @@ public class SaslServerAuthenticator implements Authenticator {
this.time = time;
this.reauthInfo = new ReauthInfo();
this.metadataRegistry = metadataRegistry;
this.apiVersionSupplier = apiVersionSupplier;
this.configs = configs;
@SuppressWarnings("unchecked")
@ -562,11 +566,6 @@ public class SaslServerAuthenticator implements Authenticator {
}
}
// Visible to override for testing
protected ApiVersionsResponse apiVersionsResponse() {
return ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
}
// Visible to override for testing
protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
this.enableKafkaSaslAuthenticateHeaders = flag;
@ -583,7 +582,7 @@ public class SaslServerAuthenticator implements Authenticator {
else {
metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(),
apiVersionsRequest.data().clientSoftwareVersion()));
sendKafkaResponse(context, apiVersionsResponse());
sendKafkaResponse(context, apiVersionSupplier.get());
setSaslState(SaslState.HANDSHAKE_REQUEST);
}
}

View File

@ -16,6 +16,7 @@
{
"apiKey": 25,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "AddOffsetsToTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 24,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 49,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "AlterClientQuotasRequest",
"validVersions": "0-1",
// Version 1 enables flexible versions.

View File

@ -16,6 +16,7 @@
{
"apiKey": 33,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "AlterConfigsRequest",
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.

View File

@ -16,6 +16,7 @@
{
"apiKey": 56,
"type": "request",
"listeners": ["zkBroker", "controller"],
"name": "AlterIsrRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 45,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 34,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "AlterReplicaLogDirsRequest",
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.

View File

@ -16,6 +16,7 @@
{
"apiKey": 51,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "AlterUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 18,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "ApiVersionsRequest",
// Versions 0 through 2 of ApiVersionsRequest are the same.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 53,
"type": "request",
"listeners": ["controller"],
"name": "BeginQuorumEpochRequest",
"validVersions": "0",
"fields": [

View File

@ -16,6 +16,7 @@
{
"apiKey": 63,
"type": "request",
"listeners": ["controller"],
"name": "BrokerHeartbeatRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey":62,
"type": "request",
"listeners": ["controller"],
"name": "BrokerRegistrationRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 7,
"type": "request",
"listeners": ["zkBroker", "controller"],
"name": "ControlledShutdownRequest",
// Version 0 of ControlledShutdownRequest has a non-standard request header
// which does not include clientId. Version 1 and later use the standard

View File

@ -16,6 +16,7 @@
{
"apiKey": 30,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "CreateAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.

View File

@ -16,6 +16,7 @@
{
"apiKey": 38,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "CreateDelegationTokenRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 37,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "CreatePartitionsRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 19,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "CreateTopicsRequest",
// Version 1 adds validateOnly.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 31,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "DeleteAclsRequest",
// Version 1 adds the pattern type.
// Version 2 enables flexible versions.

View File

@ -16,6 +16,7 @@
{
"apiKey": 42,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DeleteGroupsRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 21,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DeleteRecordsRequest",
// Version 1 is the same as version 0.

View File

@ -16,6 +16,7 @@
{
"apiKey": 20,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "DeleteTopicsRequest",
// Versions 0, 1, 2, and 3 are the same.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 29,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.

View File

@ -16,6 +16,7 @@
{
"apiKey": 48,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeClientQuotasRequest",
// Version 1 enables flexible versions.
"validVersions": "0-1",

View File

@ -16,6 +16,7 @@
{
"apiKey": 60,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeClusterRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 32,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeConfigsRequest",
// Version 1 adds IncludeSynonyms.
// Version 2 is the same as version 1.

View File

@ -16,6 +16,7 @@
{
"apiKey": 41,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,6 +16,7 @@
{
"apiKey": 15,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeGroupsRequest",
// Versions 1 and 2 are the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 35,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeLogDirsRequest",
// Version 1 is the same as version 0.
"validVersions": "0-2",

View File

@ -16,6 +16,7 @@
{
"apiKey": 61,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeProducersRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 55,
"type": "request",
"listeners": ["broker", "controller"],
"name": "DescribeQuorumRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 50,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 43,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "ElectLeadersRequest",
// Version 1 implements multiple leader election types, as described by KIP-460.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 54,
"type": "request",
"listeners": ["controller"],
"name": "EndQuorumEpochRequest",
"validVersions": "0",
"fields": [

View File

@ -16,6 +16,7 @@
{
"apiKey": 26,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "EndTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 58,
"type": "request",
"listeners": ["controller"],
"name": "EnvelopeRequest",
// Request struct for forwarding.
"validVersions": "0",

View File

@ -16,6 +16,7 @@
{
"apiKey": 40,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "ExpireDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,6 +16,7 @@
{
"apiKey": 1,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "FetchRequest",
//
// Version 1 is the same as version 0.

View File

@ -16,6 +16,7 @@
{
"apiKey": 59,
"type": "request",
"listeners": ["controller"],
"name": "FetchSnapshotRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 10,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "FindCoordinatorRequest",
// Version 1 adds KeyType.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 12,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "HeartbeatRequest",
// Version 1 and version 2 are the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 44,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "IncrementalAlterConfigsRequest",
// Version 1 is the first flexible version.
"validVersions": "0-1",

View File

@ -16,6 +16,7 @@
{
"apiKey": 22,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 11,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "JoinGroupRequest",
// Version 1 adds RebalanceTimeoutMs.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 4,
"type": "request",
"listeners": ["zkBroker"],
"name": "LeaderAndIsrRequest",
// Version 1 adds IsNew.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 13,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "LeaveGroupRequest",
// Version 1 and 2 are the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 16,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ListGroupsRequest",
// Version 1 and 2 are the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 2,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ListOffsetsRequest",
// Version 1 removes MaxNumOffsets. From this version forward, only a single
// offset can be returned.

View File

@ -16,6 +16,7 @@
{
"apiKey": 46,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ListPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 3,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "MetadataRequest",
"validVersions": "0-11",
"flexibleVersions": "9+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 8,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetCommitRequest",
// Version 1 adds timestamp and group membership information, as well as the commit timestamp.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 47,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetDeleteRequest",
"validVersions": "0",
"fields": [

View File

@ -16,6 +16,7 @@
{
"apiKey": 9,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetFetchRequest",
// In version 0, the request read offsets from ZK.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 23,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetForLeaderEpochRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 0,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ProduceRequest",
// Version 1 and 2 are the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 39,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "RenewDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,6 +16,7 @@
{
"apiKey": 36,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "SaslAuthenticateRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,6 +16,7 @@
{
"apiKey": 17,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "SaslHandshakeRequest",
// Version 1 supports SASL_AUTHENTICATE.
// NOTE: Version cannot be easily bumped due to incorrect

View File

@ -16,6 +16,7 @@
{
"apiKey": 5,
"type": "request",
"listeners": ["zkBroker"],
"name": "StopReplicaRequest",
// Version 1 adds the broker epoch and reorganizes the partitions to be stored
// per topic.

View File

@ -16,6 +16,7 @@
{
"apiKey": 14,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "SyncGroupRequest",
// Versions 1 and 2 are the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 28,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "TxnOffsetCommitRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 64,
"type": "request",
"listeners": ["broker", "controller"],
"name": "UnregisterBrokerRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 57,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 6,
"type": "request",
"listeners": ["zkBroker"],
"name": "UpdateMetadataRequest",
// Version 1 allows specifying multiple endpoints for each broker.
//

View File

@ -16,6 +16,7 @@
{
"apiKey": 52,
"type": "request",
"listeners": ["controller"],
"name": "VoteRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,6 +16,7 @@
{
"apiKey": 27,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "WriteTxnMarkersRequest",
// Version 1 enables flexible versions.
"validVersions": "0-1",

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@ -247,7 +248,8 @@ public class NetworkClientTest {
private void awaitReady(NetworkClient client, Node node) {
if (client.discoverBrokerVersions()) {
setExpectedApiVersionsResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
setExpectedApiVersionsResponse(ApiVersionsResponse.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER));
}
while (!client.ready(node, time.milliseconds()))
client.poll(1, time.milliseconds());
@ -295,8 +297,7 @@ public class NetworkClientTest {
assertTrue(client.hasInFlightRequests(node.idString()));
// prepare response
delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(),
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), defaultApiVersionsResponse());
// handle completed receives
client.poll(0, time.milliseconds());
@ -367,8 +368,7 @@ public class NetworkClientTest {
assertEquals(2, header.apiVersion());
// prepare response
delayedApiVersionsResponse(1, (short) 0,
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
delayedApiVersionsResponse(1, (short) 0, defaultApiVersionsResponse());
// handle completed receives
client.poll(0, time.milliseconds());
@ -434,8 +434,7 @@ public class NetworkClientTest {
assertEquals(0, header.apiVersion());
// prepare response
delayedApiVersionsResponse(1, (short) 0,
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE);
delayedApiVersionsResponse(1, (short) 0, defaultApiVersionsResponse());
// handle completed receives
client.poll(0, time.milliseconds());
@ -1079,6 +1078,10 @@ public class NetworkClientTest {
assertFalse(client.isReady(node, time.milliseconds()));
}
private ApiVersionsResponse defaultApiVersionsResponse() {
return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
}
private static class TestCallbackHandler implements RequestCompletionHandler {
public boolean executed = false;
public ClientResponse response;

View File

@ -16,18 +16,21 @@
*/
package org.apache.kafka.clients;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -38,7 +41,7 @@ public class NodeApiVersionsTest {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
bld.append(prefix).append(apiKey.name).
append("(").append(apiKey.id).append("): UNSUPPORTED");
prefix = ", ";
@ -133,27 +136,26 @@ public class NodeApiVersionsTest {
() -> apiVersions.latestUsableVersion(ApiKeys.PRODUCE));
}
@Test
public void testUsableVersionLatestVersions() {
List<ApiVersion> versionList = new LinkedList<>(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys());
@ParameterizedTest
@EnumSource(ApiMessageType.ListenerType.class)
public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
ApiVersionsResponse defaultResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
List<ApiVersion> versionList = new LinkedList<>(defaultResponse.data().apiKeys());
// Add an API key that we don't know about.
versionList.add(new ApiVersion()
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList);
for (ApiKeys apiKey: ApiKeys.values()) {
if (apiKey.isControllerOnlyApi) {
assertNull(versions.apiVersion(apiKey));
} else {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
}
@Test
public void testConstructionFromApiVersionsResponse() {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
@ParameterizedTest
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {

View File

@ -74,6 +74,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.CreateAclsResponseData;
@ -126,6 +127,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
@ -541,17 +543,17 @@ public class KafkaAdminClientTest {
private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures(Errors error) {
if (error == Errors.NONE) {
return new ApiVersionsResponse(ApiVersionsResponse.createApiVersionsResponseData(
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs(),
error,
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(),
return ApiVersionsResponse.createApiVersionsResponse(
0,
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
convertFinalizedFeaturesMap(defaultFeatureMetadata().finalizedFeatures()),
defaultFeatureMetadata().finalizedFeaturesEpoch().get()));
defaultFeatureMetadata().finalizedFeaturesEpoch().get()
);
}
return new ApiVersionsResponse(
new ApiVersionsResponseData()
.setThrottleTimeMs(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs())
.setThrottleTimeMs(0)
.setErrorCode(error.code()));
}

View File

@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
import org.apache.kafka.common.message.ListOffsetsResponseData;
@ -2074,8 +2075,9 @@ public class FetcherTest {
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(ApiVersionsResponse.createApiVersionsResponse(
400, RecordBatch.CURRENT_MAGIC_VALUE), ApiKeys.API_VERSIONS.latestVersion(), 0);
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
while (!client.ready(node, time.milliseconds())) {

View File

@ -24,13 +24,6 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@ -39,15 +32,19 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -66,15 +63,19 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
@ -105,12 +106,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.AdditionalMatchers.geq;
import static org.mockito.ArgumentMatchers.any;
@ -287,9 +288,9 @@ public class SenderTest {
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
time, true, new ApiVersions(), throttleTimeSensor, logContext);
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(
ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE),
ApiKeys.API_VERSIONS.latestVersion(), 0);
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
while (!client.ready(node, time.milliseconds())) {

View File

@ -18,9 +18,11 @@ package org.apache.kafka.common.network;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
@ -117,7 +119,8 @@ public class NioEchoServer extends Thread {
LogContext logContext = new LogContext();
if (channelBuilder == null)
channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false,
securityProtocol, config, credentialCache, tokenCache, time, logContext);
securityProtocol, config, credentialCache, tokenCache, time, logContext,
() -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
this.metrics = new Metrics();
this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time,
"MetricGroup", channelBuilder, logContext);

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@ -48,6 +50,7 @@ import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -160,9 +163,8 @@ public class SaslChannelBuilderTest {
private SaslChannelBuilder createGssapiChannelBuilder(Map<String, JaasContext> jaasContexts, GSSManager gssManager) {
SaslChannelBuilder channelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
SecurityProtocol.SASL_PLAINTEXT,
new ListenerName("GSSAPI"), false, "GSSAPI",
true, null, null, null, Time.SYSTEM, new LogContext()) {
SecurityProtocol.SASL_PLAINTEXT, new ListenerName("GSSAPI"), false, "GSSAPI",
true, null, null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier()) {
@Override
protected GSSManager gssManager() {
@ -174,6 +176,10 @@ public class SaslChannelBuilderTest {
return channelBuilder;
}
private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
return () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
}
private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol, String saslMechanism) {
Class<?> loginModule = null;
switch (saslMechanism) {
@ -198,7 +204,7 @@ public class SaslChannelBuilderTest {
Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName(saslMechanism),
false, saslMechanism, true, null,
null, null, Time.SYSTEM, new LogContext());
null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier());
}
public static final class TestGssapiLoginModule implements LoginModule {

View File

@ -22,7 +22,9 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
@ -58,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
@ -1018,7 +1021,8 @@ public class SslTransportLayerTest {
TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
true, securityProtocol, config, null, null, time, new LogContext());
true, securityProtocol, config, null, null, time, new LogContext(),
defaultApiVersionsSupplier());
server = new NioEchoServer(listenerName, securityProtocol, config,
"localhost", serverChannelBuilder, null, time);
server.start();
@ -1040,8 +1044,9 @@ public class SslTransportLayerTest {
args.sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
assertThrows(KafkaException.class, () -> ChannelBuilders.serverChannelBuilder(listenerName, true, securityProtocol, config,
null, null, time, new LogContext()));
assertThrows(KafkaException.class, () -> ChannelBuilders.serverChannelBuilder(
listenerName, true, securityProtocol, config,
null, null, time, new LogContext(), defaultApiVersionsSupplier()));
}
/**
@ -1055,7 +1060,8 @@ public class SslTransportLayerTest {
TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
false, securityProtocol, config, null, null, time, new LogContext());
false, securityProtocol, config, null, null, time, new LogContext(),
defaultApiVersionsSupplier());
server = new NioEchoServer(listenerName, securityProtocol, config,
"localhost", serverChannelBuilder, null, time);
server.start();
@ -1111,7 +1117,8 @@ public class SslTransportLayerTest {
TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
false, securityProtocol, config, null, null, time, new LogContext());
false, securityProtocol, config, null, null, time, new LogContext(),
defaultApiVersionsSupplier());
server = new NioEchoServer(listenerName, securityProtocol, config,
"localhost", serverChannelBuilder, null, time);
server.start();
@ -1176,7 +1183,8 @@ public class SslTransportLayerTest {
TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
false, securityProtocol, config, null, null, time, new LogContext());
false, securityProtocol, config, null, null, time, new LogContext(),
defaultApiVersionsSupplier());
server = new NioEchoServer(listenerName, securityProtocol, config,
"localhost", serverChannelBuilder, null, time);
server.start();
@ -1334,6 +1342,10 @@ public class SslTransportLayerTest {
void run() throws IOException;
}
private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
return () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
}
static class TestSslChannelBuilder extends SslChannelBuilder {
private Integer netReadBufSizeOverride;

View File

@ -20,9 +20,12 @@ import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -60,11 +63,7 @@ public class ApiKeysTest {
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR);
for (ApiKeys apiKey: ApiKeys.values()) {
// Disable broker-to-controller API throttling test
if (apiKey.isControllerOnlyApi) {
continue;
}
for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
BoundField throttleTimeField = responseSchema.get("throttle_time_ms");
if ((apiKey.clusterAction && !clusterActionsWithThrottleTimeMs.contains(apiKey))
@ -74,4 +73,17 @@ public class ApiKeysTest {
assertNotNull(throttleTimeField, "Throttle time field missing: " + apiKey);
}
}
@Test
public void testApiScope() {
Set<ApiKeys> apisMissingScope = new HashSet<>();
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey.messageType.listeners().isEmpty()) {
apisMissingScope.add(apiKey);
}
}
assertEquals(Collections.emptySet(), apisMissingScope,
"Found some APIs missing scope definition");
}
}

View File

@ -17,17 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -38,21 +38,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ApiVersionsResponseTest {
@Test
public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.brokerApis()));
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
}
@ParameterizedTest
@EnumSource(ApiMessageType.ListenerType.class)
public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse defaultResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
assertEquals(ApiKeys.apisForListener(scope).size(), defaultResponse.data().apiKeys().size(),
"API versions for all API keys must be maintained.");
@Test
public void shouldHaveCorrectDefaultApiVersionsResponse() {
Collection<ApiVersion> apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys();
assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API versions for all API keys must be maintained.");
for (ApiKeys key : ApiKeys.brokerApis()) {
ApiVersion version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id);
for (ApiKeys key : ApiKeys.apisForListener(scope)) {
ApiVersion version = defaultResponse.apiVersion(key.id);
assertNotNull(version, "Could not find ApiVersion for API " + key.name);
assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name);
assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect max version for Api " + key.name);
@ -74,9 +68,9 @@ public class ApiVersionsResponseTest {
}
}
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
assertTrue(defaultResponse.data().supportedFeatures().isEmpty());
assertTrue(defaultResponse.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, defaultResponse.data().finalizedFeaturesEpoch());
}
@Test
@ -96,9 +90,11 @@ public class ApiVersionsResponseTest {
.setMaxVersion(maxVersion))
);
ApiVersionCollection commonResponse = ApiVersionsResponse.intersectControllerApiVersions(
RecordBatch.CURRENT_MAGIC_VALUE,
activeControllerApiVersions);
ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis(
ApiMessageType.ListenerType.ZK_BROKER,
RecordVersion.current(),
activeControllerApiVersions
);
verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse);
@ -149,11 +145,4 @@ public class ApiVersionsResponseTest {
assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey));
}
private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) {
final Set<ApiKeys> apiKeys = new HashSet<>();
for (final ApiVersion version : apiVersions.data().apiKeys()) {
apiKeys.add(ApiKeys.forId(version.apiKey()));
}
return apiKeys;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
@ -145,11 +146,11 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
@ -346,17 +347,6 @@ public class RequestResponseTest {
checkErrorResponse(createSaslAuthenticateRequest(), unknownServerException, true);
checkResponse(createSaslAuthenticateResponse(), 0, true);
checkResponse(createSaslAuthenticateResponse(), 1, true);
checkRequest(createApiVersionRequest(), true);
checkErrorResponse(createApiVersionRequest(), unknownServerException, true);
checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true);
checkResponse(createApiVersionResponse(), 0, true);
checkResponse(createApiVersionResponse(), 1, true);
checkResponse(createApiVersionResponse(), 2, true);
checkResponse(createApiVersionResponse(), 3, true);
checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 0, true);
checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 1, true);
checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 2, true);
checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true);
for (int v = ApiKeys.CREATE_TOPICS.oldestVersion(); v <= ApiKeys.CREATE_TOPICS.latestVersion(); v++) {
checkRequest(createCreateTopicRequest(v), true);
@ -521,9 +511,20 @@ public class RequestResponseTest {
checkResponse(createAlterClientQuotasResponse(), 0, true);
}
@Test
public void testApiVersionsSerialization() {
for (short v : ApiKeys.API_VERSIONS.allVersions()) {
checkRequest(createApiVersionRequest(v), true);
checkErrorResponse(createApiVersionRequest(v), unknownServerException, true);
checkErrorResponse(createApiVersionRequest(v), new UnsupportedVersionException("Not Supported"), true);
checkResponse(createApiVersionResponse(), v, true);
checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), v, true);
}
}
@Test
public void testBrokerHeartbeatSerialization() {
for (short v = ApiKeys.BROKER_HEARTBEAT.oldestVersion(); v <= ApiKeys.BROKER_HEARTBEAT.latestVersion(); v++) {
for (short v : ApiKeys.BROKER_HEARTBEAT.allVersions()) {
checkRequest(createBrokerHeartbeatRequest(v), true);
checkErrorResponse(createBrokerHeartbeatRequest(v), unknownServerException, true);
checkResponse(createBrokerHeartbeatResponse(), v, true);
@ -532,7 +533,7 @@ public class RequestResponseTest {
@Test
public void testBrokerRegistrationSerialization() {
for (short v = ApiKeys.BROKER_REGISTRATION.oldestVersion(); v <= ApiKeys.BROKER_REGISTRATION.latestVersion(); v++) {
for (short v : ApiKeys.BROKER_REGISTRATION.allVersions()) {
checkRequest(createBrokerRegistrationRequest(v), true);
checkErrorResponse(createBrokerRegistrationRequest(v), unknownServerException, true);
checkResponse(createBrokerRegistrationResponse(), 0, true);
@ -540,8 +541,8 @@ public class RequestResponseTest {
}
@Test
public void testDescribeProducersSerialization() throws Exception {
for (short v = ApiKeys.DESCRIBE_PRODUCERS.oldestVersion(); v <= ApiKeys.DESCRIBE_PRODUCERS.latestVersion(); v++) {
public void testDescribeProducersSerialization() {
for (short v : ApiKeys.DESCRIBE_PRODUCERS.allVersions()) {
checkRequest(createDescribeProducersRequest(v), true);
checkErrorResponse(createDescribeProducersRequest(v), unknownServerException, true);
checkResponse(createDescribeProducersResponse(), v, true);
@ -549,8 +550,8 @@ public class RequestResponseTest {
}
@Test
public void testDescribeClusterSerialization() throws Exception {
for (short v = ApiKeys.DESCRIBE_CLUSTER.oldestVersion(); v <= ApiKeys.DESCRIBE_CLUSTER.latestVersion(); v++) {
public void testDescribeClusterSerialization() {
for (short v : ApiKeys.DESCRIBE_CLUSTER.allVersions()) {
checkRequest(createDescribeClusterRequest(v), true);
checkErrorResponse(createDescribeClusterRequest(v), unknownServerException, true);
checkResponse(createDescribeClusterResponse(), v, true);
@ -559,7 +560,7 @@ public class RequestResponseTest {
@Test
public void testUnregisterBrokerSerialization() {
for (short v = ApiKeys.UNREGISTER_BROKER.oldestVersion(); v <= ApiKeys.UNREGISTER_BROKER.latestVersion(); v++) {
for (short v : ApiKeys.UNREGISTER_BROKER.allVersions()) {
checkRequest(createUnregisterBrokerRequest(v), true);
checkErrorResponse(createUnregisterBrokerRequest(v), unknownServerException, true);
checkResponse(createUnregisterBrokerResponse(), v, true);
@ -1013,47 +1014,56 @@ public class RequestResponseTest {
@Test
public void testApiVersionResponseWithUnsupportedError() {
ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
ApiVersionsResponse response = request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception());
for (short version : ApiKeys.API_VERSIONS.allVersions()) {
ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version);
ApiVersionsResponse response = request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception());
assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data().errorCode());
assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data().errorCode());
ApiVersion apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
assertNotNull(apiVersion);
assertEquals(ApiKeys.API_VERSIONS.id, apiVersion.apiKey());
assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), apiVersion.minVersion());
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion());
ApiVersion apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
assertNotNull(apiVersion);
assertEquals(ApiKeys.API_VERSIONS.id, apiVersion.apiKey());
assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), apiVersion.minVersion());
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion());
}
}
@Test
public void testApiVersionResponseWithNotUnsupportedError() {
ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
ApiVersionsResponse response = request.getErrorResponse(0, Errors.INVALID_REQUEST.exception());
for (short version : ApiKeys.API_VERSIONS.allVersions()) {
ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version);
ApiVersionsResponse response = request.getErrorResponse(0, Errors.INVALID_REQUEST.exception());
assertEquals(response.data().errorCode(), Errors.INVALID_REQUEST.code());
assertTrue(response.data().apiKeys().isEmpty());
}
}
assertEquals(response.data().errorCode(), Errors.INVALID_REQUEST.code());
assertTrue(response.data().apiKeys().isEmpty());
private ApiVersionsResponse defaultApiVersionsResponse() {
return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
}
@Test
public void testApiVersionResponseParsingFallback() {
ByteBuffer buffer = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.serialize((short) 0);
ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, ApiKeys.API_VERSIONS.latestVersion());
assertEquals(Errors.NONE.code(), response.data().errorCode());
for (short version : ApiKeys.API_VERSIONS.allVersions()) {
ByteBuffer buffer = defaultApiVersionsResponse().serialize((short) 0);
ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, version);
assertEquals(Errors.NONE.code(), response.data().errorCode());
}
}
@Test
public void testApiVersionResponseParsingFallbackException() {
short version = 0;
assertThrows(BufferUnderflowException.class, () -> ApiVersionsResponse.parse(ByteBuffer.allocate(0), version));
for (final short version : ApiKeys.API_VERSIONS.allVersions()) {
assertThrows(BufferUnderflowException.class, () -> ApiVersionsResponse.parse(ByteBuffer.allocate(0), version));
}
}
@Test
public void testApiVersionResponseParsing() {
ByteBuffer buffer = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.serialize(ApiKeys.API_VERSIONS.latestVersion());
ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, ApiKeys.API_VERSIONS.latestVersion());
assertEquals(Errors.NONE.code(), response.data().errorCode());
for (short version : ApiKeys.API_VERSIONS.allVersions()) {
ByteBuffer buffer = defaultApiVersionsResponse().serialize(version);
ApiVersionsResponse response = ApiVersionsResponse.parse(buffer, version);
assertEquals(Errors.NONE.code(), response.data().errorCode());
}
}
@Test
@ -1773,8 +1783,8 @@ public class RequestResponseTest {
return new SaslAuthenticateResponse(data);
}
private ApiVersionsRequest createApiVersionRequest() {
return new ApiVersionsRequest.Builder().build();
private ApiVersionsRequest createApiVersionRequest(short version) {
return new ApiVersionsRequest.Builder().build(version);
}
private ApiVersionsResponse createApiVersionResponse() {

View File

@ -16,41 +16,6 @@
*/
package org.apache.kafka.common.security.authenticator;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Base64.Encoder;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
@ -60,13 +25,14 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@ -87,26 +53,28 @@ import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.DigestServerCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
@ -115,20 +83,17 @@ import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBea
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.DigestServerCallbackHandler;
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@ -137,6 +102,41 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -1903,31 +1903,18 @@ public class SaslAuthenticatorTest {
boolean isScram = ScramMechanism.isScram(saslMechanism);
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
Supplier<ApiVersionsResponse> apiVersionSupplier = () -> {
ApiVersionCollection versionCollection = new ApiVersionCollection(2);
versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(versionCollection));
};
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true,
credentialCache, null, null, time, new LogContext()) {
credentialCache, null, null, time, new LogContext(), apiVersionSupplier);
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs,
Map<String, AuthenticateCallbackHandler> callbackHandlers,
String id,
TransportLayer transportLayer,
Map<String, Subject> subjects,
Map<String, Long> connectionsMaxReauthMsByMechanism,
ChannelMetadataRegistry metadataRegistry) {
return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName,
securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) {
@Override
protected ApiVersionsResponse apiVersionsResponse() {
ApiVersionCollection versionCollection = new ApiVersionCollection(2);
versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100));
return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(versionCollection));
}
};
}
};
serverChannelBuilder.configure(saslServerConfigs);
server = new NioEchoServer(listenerName, securityProtocol, new TestSecurityConfig(saslServerConfigs),
"localhost", serverChannelBuilder, credentialCache, time);
@ -1945,10 +1932,29 @@ public class SaslAuthenticatorTest {
boolean isScram = ScramMechanism.isScram(saslMechanism);
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
Supplier<ApiVersionsResponse> apiVersionSupplier = () -> {
ApiVersionsResponse defaultApiVersionResponse = ApiVersionsResponse.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER);
ApiVersionCollection apiVersions = new ApiVersionCollection();
for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) {
if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
// ApiVersion can NOT be reused in second ApiVersionCollection
// due to the internal pointers it contains.
apiVersions.add(apiVersion.duplicate());
}
}
ApiVersionsResponseData data = new ApiVersionsResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(0)
.setApiKeys(apiVersions);
return new ApiVersionsResponse(data);
};
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true,
credentialCache, null, null, time, new LogContext()) {
credentialCache, null, null, time, new LogContext(), apiVersionSupplier) {
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs,
Map<String, AuthenticateCallbackHandler> callbackHandlers,
@ -1958,27 +1964,7 @@ public class SaslAuthenticatorTest {
Map<String, Long> connectionsMaxReauthMsByMechanism,
ChannelMetadataRegistry metadataRegistry) {
return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName,
securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) {
@Override
protected ApiVersionsResponse apiVersionsResponse() {
ApiVersionsResponse defaultApiVersionResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
ApiVersionCollection apiVersions = new ApiVersionCollection();
for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) {
if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
// ApiVersion can NOT be reused in second ApiVersionCollection
// due to the internal pointers it contains.
apiVersions.add(apiVersion.duplicate());
}
}
ApiVersionsResponseData data = new ApiVersionsResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(0)
.setApiKeys(apiVersions);
return new ApiVersionsResponse(data);
}
securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time, apiVersionSupplier) {
@Override
protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
// Don't enable Kafka SASL_AUTHENTICATE headers
@ -2003,7 +1989,7 @@ public class SaslAuthenticatorTest {
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true,
null, null, null, time, new LogContext()) {
null, null, null, time, new LogContext(), null) {
@Override
protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs,
@ -2545,7 +2531,8 @@ public class SaslAuthenticatorTest {
String clientSaslMechanism, boolean handshakeRequestEnable, CredentialCache credentialCache,
DelegationTokenCache tokenCache, Time time) {
super(mode, jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism,
handshakeRequestEnable, credentialCache, tokenCache, null, time, new LogContext());
handshakeRequestEnable, credentialCache, tokenCache, null, time, new LogContext(),
() -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.security.authenticator;
import java.net.InetAddress;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
@ -27,6 +28,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@ -152,15 +154,17 @@ public class SaslServerAuthenticatorTest {
}
private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer,
String mechanism, ChannelMetadataRegistry metadataRegistry) throws IOException {
String mechanism, ChannelMetadataRegistry metadataRegistry) {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
mechanism, new SaslServerCallbackHandler());
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER);
return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null,
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(),
metadataRegistry, Time.SYSTEM);
metadataRegistry, Time.SYSTEM, () -> apiVersionsResponse);
}
}

View File

@ -21,10 +21,9 @@ import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{RecordBatch, RecordVersion}
import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse}
import org.apache.kafka.common.requests.ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.requests.ApiVersionsResponse
/**
* This class contains the different Kafka versions.
@ -147,52 +146,46 @@ object ApiVersion {
}
}
def apiVersionsResponse(throttleTimeMs: Int,
maxMagic: Byte,
latestSupportedFeatures: Features[SupportedVersionRange],
controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
def apiVersionsResponse(
throttleTimeMs: Int,
minRecordVersion: RecordVersion,
latestSupportedFeatures: Features[SupportedVersionRange],
controllerApiVersions: Option[NodeApiVersions],
listenerType: ListenerType
): ApiVersionsResponse = {
apiVersionsResponse(
throttleTimeMs,
maxMagic,
minRecordVersion,
latestSupportedFeatures,
Features.emptyFinalizedFeatures,
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
controllerApiVersions
controllerApiVersions,
listenerType
)
}
def apiVersionsResponse(throttleTimeMs: Int,
maxMagic: Byte,
latestSupportedFeatures: Features[SupportedVersionRange],
finalizedFeatures: Features[FinalizedVersionRange],
finalizedFeaturesEpoch: Long,
controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
def apiVersionsResponse(
throttleTimeMs: Int,
minRecordVersion: RecordVersion,
latestSupportedFeatures: Features[SupportedVersionRange],
finalizedFeatures: Features[FinalizedVersionRange],
finalizedFeaturesEpoch: Long,
controllerApiVersions: Option[NodeApiVersions],
listenerType: ListenerType
): ApiVersionsResponse = {
val apiKeys = controllerApiVersions match {
case None => ApiVersionsResponse.defaultApiKeys(maxMagic)
case Some(controllerApiVersion) => ApiVersionsResponse.intersectControllerApiVersions(
maxMagic, controllerApiVersion.allSupportedApiVersions())
case None => ApiVersionsResponse.filterApis(minRecordVersion, listenerType)
case Some(controllerApiVersion) => ApiVersionsResponse.intersectForwardableApis(
listenerType, minRecordVersion, controllerApiVersion.allSupportedApiVersions())
}
if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE &&
throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME) {
new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs,
Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode),
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch))
} else {
new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch))
}
ApiVersionsResponse.createApiVersionsResponse(
throttleTimeMs,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
)
}
}

View File

@ -30,6 +30,7 @@ import kafka.utils.{Logging, NotNothing, Pool}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
import org.apache.kafka.common.network.Send
@ -59,16 +60,19 @@ object RequestChannel extends Logging {
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}
class Metrics(allowControllerOnlyApis: Boolean = false) {
class Metrics(enabledApis: Iterable[ApiKeys]) {
def this(scope: ListenerType) = {
this(ApiKeys.apisForListener(scope).asScala)
}
private val metricsMap = mutable.Map[String, RequestMetrics]()
(ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi || allowControllerOnlyApis).map(_.name) ++
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
(enabledApis.map(_.name) ++
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
metricsMap.put(name, new RequestMetrics(name))
}
def apply(metricName: String) = metricsMap(metricName)
def apply(metricName: String): RequestMetrics = metricsMap(metricName)
def close(): Unit = {
metricsMap.values.foreach(_.removeMetrics())
@ -296,8 +300,6 @@ object RequestChannel extends Logging {
def responseLog: Option[JsonNode] = None
def onComplete: Option[Send => Unit] = None
override def toString: String
}
/** responseLogValue should only be defined if request logging is enabled */
@ -337,9 +339,8 @@ object RequestChannel extends Logging {
class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time,
allowControllerOnlyApis: Boolean = false) extends KafkaMetricsGroup {
val metrics: RequestChannel.Metrics) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics(allowControllerOnlyApis)
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)

View File

@ -33,7 +33,7 @@ import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
import kafka.security.CredentialProvider
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
@ -78,15 +78,14 @@ class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider,
allowControllerOnlyApis: Boolean = false,
controllerSocketServer: Boolean = false)
val apiVersionManager: ApiVersionManager)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
private val maxQueuedRequests = config.queuedMaxRequests
private val nodeId = config.brokerId
private val logContext = new LogContext(s"[SocketServer ${if (controllerSocketServer) "controller" else "broker"}Id=${nodeId}] ")
private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")
this.logIdent = logContext.logPrefix
@ -98,12 +97,12 @@ class SocketServer(val config: KafkaConfig,
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowControllerOnlyApis)
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time, allowControllerOnlyApis))
new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))
private var nextProcessorId = 0
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@ -438,8 +437,9 @@ class SocketServer(val config: KafkaConfig,
credentialProvider,
memoryPool,
logContext,
isPrivilegedListener = isPrivilegedListener,
allowControllerOnlyApis = allowControllerOnlyApis
Processor.ConnectionQueueSize,
isPrivilegedListener,
apiVersionManager
)
}
@ -772,7 +772,6 @@ private[kafka] object Processor {
val IdlePercentMetricName = "IdlePercent"
val NetworkProcessorMetricTag = "networkProcessor"
val ListenerMetricTag = "listener"
val ConnectionQueueSize = 20
}
@ -800,9 +799,9 @@ private[kafka] class Processor(val id: Int,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize,
isPrivilegedListener: Boolean = false,
allowControllerOnlyApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
connectionQueueSize: Int,
isPrivilegedListener: Boolean,
apiVersionManager: ApiVersionManager) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@ -842,14 +841,19 @@ private[kafka] class Processor(val id: Int,
metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
private val selector = createSelector(
ChannelBuilders.serverChannelBuilder(listenerName,
ChannelBuilders.serverChannelBuilder(
listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache,
time,
logContext))
logContext,
() => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
)
)
// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
channelBuilder match {
@ -993,10 +997,10 @@ private[kafka] class Processor(val id: Int,
protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) {
if (apiVersionManager.isApiEnabled(header.apiKey)) {
header
} else {
throw new InvalidRequestException("Received request for KIP-500 controller-only api key " + header.apiKey)
throw new InvalidRequestException(s"Received request api key ${header.apiKey} which is not enabled")
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.api.ApiVersion
import kafka.network
import kafka.network.RequestChannel
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ApiVersionsResponseData
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import scala.jdk.CollectionConverters._
trait ApiVersionManager {
def listenerType: ListenerType
def enabledApis: collection.Set[ApiKeys]
def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
}
object ApiVersionManager {
def apply(
listenerType: ListenerType,
config: KafkaConfig,
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
featureCache: FinalizedFeatureCache
): ApiVersionManager = {
new DefaultApiVersionManager(
listenerType,
config.interBrokerProtocolVersion,
forwardingManager,
features,
featureCache
)
}
}
class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys]
) extends ApiVersionManager {
def this(listenerType: ListenerType) = {
this(listenerType, ApiKeys.apisForListener(listenerType).asScala)
}
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
ApiVersionsResponse.createApiVersionsResponse(0, apiVersions)
}
}
class DefaultApiVersionManager(
val listenerType: ListenerType,
interBrokerProtocolVersion: ApiVersion,
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
featureCache: FinalizedFeatureCache
) extends ApiVersionManager {
override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
val supportedFeatures = features.supportedFeatures
val finalizedFeaturesOpt = featureCache.get
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
val response = finalizedFeaturesOpt match {
case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
throttleTimeMs,
interBrokerProtocolVersion.recordVersion,
supportedFeatures,
finalizedFeatures.features,
finalizedFeatures.epoch,
controllerApiVersions,
listenerType)
case None => ApiVersion.apiVersionsResponse(
throttleTimeMs,
interBrokerProtocolVersion.recordVersion,
supportedFeatures,
controllerApiVersions,
listenerType)
}
// This is a temporary workaround in order to allow testing of forwarding
// in integration tests. We can remove this after the KIP-500 controller
// is available for integration testing.
if (forwardingManager.isDefined) {
response.data.apiKeys.add(
new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.ENVELOPE.id)
.setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
.setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
)
}
response
}
override def enabledApis: collection.Set[ApiKeys] = {
forwardingManager match {
case Some(_) => ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE)
case None => ApiKeys.apisForListener(listenerType).asScala
}
}
override def isApiEnabled(apiKey: ApiKeys): Boolean = {
apiKey.inScope(listenerType) || (apiKey == ApiKeys.ENVELOPE && forwardingManager.isDefined)
}
}

View File

@ -33,6 +33,7 @@ import kafka.server.KafkaBroker.metricsPrefix
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@ -167,7 +168,7 @@ class BrokerServer(
// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and broker configs.
logManager = LogManager(config, initialOfflineDirs, configRepository, kafkaScheduler, time,
brokerTopicStats, logDirFailureChannel, true)
brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
metadataCache = MetadataCache.raftMetadataCache(config.nodeId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
@ -175,17 +176,44 @@ class BrokerServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
val forwardingChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
config,
channelName = "forwarding",
threadNamePrefix,
retryTimeoutMs = 60000
)
forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
forwardingManager.start()
val apiVersionManager = ApiVersionManager(
ListenerType.BROKER,
config,
Some(forwardingManager),
brokerFeatures,
featureCache
)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = false)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
socketServer.startup(startProcessingRequests = false)
val controllerNodes =
RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
val alterIsrChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, "alterisr", threadNamePrefix, 60000)
val alterIsrChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
config,
channelName = "alterisr",
threadNamePrefix,
retryTimeoutMs = Long.MaxValue
)
alterIsrManager = new DefaultAlterIsrManager(
controllerChannelManager = alterIsrChannelManager,
scheduler = kafkaScheduler,
@ -200,11 +228,6 @@ class BrokerServer(
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
configRepository, threadNamePrefix)
val forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, "forwarding", threadNamePrefix, 60000)
forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
forwardingManager.start()
/* start token manager */
if (config.tokenAuthEnabled) {
throw new UnsupportedOperationException("Delegation tokens are not supported")
@ -306,7 +329,7 @@ class BrokerServer(
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
@ -315,7 +338,7 @@ class BrokerServer(
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport,
replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)

View File

@ -28,6 +28,7 @@ import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@ -124,15 +125,16 @@ class ControllerServer(
}.toMap
}
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config,
metrics,
time,
credentialProvider,
allowControllerOnlyApis = true,
controllerSocketServer = true)
socketServer.startup(false, None, config.controllerListeners)
apiVersionManager)
socketServer.startup(startProcessingRequests = false, controlPlaneListener = None, config.controllerListeners)
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
config.controllerListeners.head.listenerName))

View File

@ -61,7 +61,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, DescribeProducersResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, DescribeProducersResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -110,8 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager,
val brokerFeatures: BrokerFeatures,
val finalizedFeatureCache: FinalizedFeatureCache) extends ApiRequestHandler with Logging {
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
metadataSupport.ensureConsistentWith(config)
@ -158,6 +157,12 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
// The socket server will reject APIs which are not exposed in this scope and close the connection
// before handing them to the request handler, so this path should not be exercised in practice
throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
}
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
@ -217,17 +222,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
// Handle requests that should have been sent to the KIP-500 controller.
// Until we are ready to integrate the Raft layer, these APIs are treated as
// unexpected and we just close the connection.
case ApiKeys.VOTE => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.BEGIN_QUORUM_EPOCH => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.END_QUORUM_EPOCH => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.DESCRIBE_QUORUM => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.FETCH_SNAPSHOT => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.BROKER_REGISTRATION => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.BROKER_HEARTBEAT => requestHelper.closeConnection(request, util.Collections.emptyMap())
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
case e: FatalExitError => throw e
@ -1686,39 +1681,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// ApiVersionRequest is not available.
def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
val apiVersionRequest = request.body[ApiVersionsRequest]
if (apiVersionRequest.hasUnsupportedRequestVersion)
if (apiVersionRequest.hasUnsupportedRequestVersion) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
else if (!apiVersionRequest.isValid)
} else if (!apiVersionRequest.isValid) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
val controllerApiVersions = metadataSupport.forwardingManager.flatMap(_.controllerApiVersions)
val apiVersionsResponse =
finalizedFeaturesOpt match {
case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures,
finalizedFeatures.features,
finalizedFeatures.epoch,
controllerApiVersions)
case None => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures,
controllerApiVersions)
}
if (request.context.fromPrivilegedListener) {
apiVersionsResponse.data.apiKeys().add(
new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.ENVELOPE.id)
.setMinVersion(ApiKeys.ENVELOPE.oldestVersion())
.setMaxVersion(ApiKeys.ENVELOPE.latestVersion())
)
}
apiVersionsResponse
} else {
apiVersionManager.apiVersionResponse(requestThrottleMs)
}
}
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)

View File

@ -74,8 +74,17 @@ class KafkaRaftServer(
private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix,
offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES))
Some(new BrokerServer(
config,
metaProps,
metaLogShim,
time,
metrics,
threadNamePrefix,
offlineDirs,
controllerQuorumVotersFuture,
Server.SUPPORTED_FEATURES
))
} else {
None
}
@ -89,7 +98,7 @@ class KafkaRaftServer(
time,
metrics,
threadNamePrefix,
CompletableFuture.completedFuture(config.quorumVoters)
controllerQuorumVotersFuture
))
} else {
None

View File

@ -37,6 +37,7 @@ import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
@ -157,7 +158,6 @@ class KafkaServer(
private var _featureChangeListener: FinalizedFeatureChangeListener = null
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
def clusterId: String = _clusterId
@ -256,14 +256,32 @@ class KafkaServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
if (enableForwarding) {
this.forwardingManager = Some(ForwardingManager(
config,
metadataCache,
time,
metrics,
threadNamePrefix
))
forwardingManager.foreach(_.start())
}
val apiVersionManager = ApiVersionManager(
ListenerType.ZK_BROKER,
config,
forwardingManager,
brokerFeatures,
featureCache
)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//
// Note that we allow the use of KIP-500 controller APIs when forwarding is enabled
// so that the Envelope request is exposed. This is only used in testing currently.
socketServer = new SocketServer(config, metrics, time, credentialProvider,
allowControllerOnlyApis = enableForwarding)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
@ -300,18 +318,6 @@ class KafkaServer(
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
/* start forwarding manager */
if (enableForwarding) {
this.forwardingManager = Some(ForwardingManager(
config,
metadataCache,
time,
metrics,
threadNamePrefix
))
forwardingManager.foreach(_.start())
}
adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
@ -363,7 +369,7 @@ class KafkaServer(
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
@ -371,7 +377,7 @@ class KafkaServer(
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)

View File

@ -20,7 +20,7 @@ package kafka.tools
import kafka.network.RequestChannel
import kafka.network.RequestConvertToJson
import kafka.raft.RaftManager
import kafka.server.ApiRequestHandler
import kafka.server.{ApiRequestHandler, ApiVersionManager}
import kafka.utils.Logging
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData}
@ -38,6 +38,7 @@ class TestRaftRequestHandler(
raftManager: RaftManager[_],
requestChannel: RequestChannel,
time: Time,
apiVersionManager: ApiVersionManager
) extends ApiRequestHandler with Logging {
override def handle(request: RequestChannel.Request): Unit = {
@ -45,6 +46,7 @@ class TestRaftRequestHandler(
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.header.apiKey match {
case ApiKeys.API_VERSIONS => handleApiVersions(request)
case ApiKeys.VOTE => handleVote(request)
case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
@ -62,6 +64,10 @@ class TestRaftRequestHandler(
}
}
private def handleApiVersions(request: RequestChannel.Request): Unit = {
sendResponse(request, Some(apiVersionManager.apiVersionResponse(throttleTimeMs = 0)))
}
private def handleVote(request: RequestChannel.Request): Unit = {
handle(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
}

View File

@ -24,9 +24,10 @@ import joptsimple.OptionException
import kafka.network.SocketServer
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Logging, ShutdownableThread}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles}
@ -68,7 +69,8 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = true)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
socketServer.startup(startProcessingRequests = false)
val metaProperties = MetaProperties(
@ -96,7 +98,8 @@ class TestRaftServer(
val requestHandler = new TestRaftRequestHandler(
raftManager,
socketServer.dataPlaneRequestChannel,
time
time,
apiVersionManager
)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(

View File

@ -55,7 +55,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
assertTrue(lineIter.hasNext)
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
val nodeApiVersions = NodeApiVersions.create
val enabledApis = ApiKeys.brokerApis.asScala
val enabledApis = ApiKeys.zkBrokerApis.asScala
for (apiKey <- enabledApis) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
assertNotNull(apiVersion)

View File

@ -29,7 +29,9 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.SaslAuthenticationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network._
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
import org.apache.kafka.common.security.kerberos.KerberosLogin
@ -233,7 +235,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
val config = new TestSecurityConfig(clientConfig)
val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol,
null, false, kafkaClientSaslMechanism, true, null, null, null, time, new LogContext()) {
null, false, kafkaClientSaslMechanism, true, null, null, null, time, new LogContext(),
() => ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) {
override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
}
channelBuilder.configure(config.values())

View File

@ -20,6 +20,7 @@ package kafka.api
import java.util
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.{RecordBatch, RecordVersion}
import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse}
@ -179,9 +180,10 @@ class ApiVersionTest {
def shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue(): Unit = {
val response = ApiVersion.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
RecordVersion.V1,
Features.emptySupportedFeatures,
None
None,
ListenerType.ZK_BROKER
)
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
assertEquals(10, response.throttleTimeMs)
@ -194,13 +196,14 @@ class ApiVersionTest {
def shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(): Unit = {
val response = ApiVersion.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
RecordVersion.V1,
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange(1.toShort, 4.toShort)))),
Features.finalizedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange(2.toShort, 3.toShort)))),
10,
None
None,
ListenerType.ZK_BROKER
)
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
@ -228,11 +231,12 @@ class ApiVersionTest {
def shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(): Unit = {
val response = ApiVersion.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
RecordVersion.current(),
Features.emptySupportedFeatures,
None
None,
ListenerType.ZK_BROKER
)
assertEquals(new util.HashSet[ApiKeys](ApiKeys.brokerApis), apiKeysInResponse(response))
assertEquals(new util.HashSet[ApiKeys](ApiKeys.zkBrokerApis), apiKeysInResponse(response))
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs)
assertTrue(response.data.supportedFeatures.isEmpty)
assertTrue(response.data.finalizedFeatures.isEmpty)
@ -243,9 +247,10 @@ class ApiVersionTest {
def testMetadataQuorumApisAreDisabled(): Unit = {
val response = ApiVersion.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
RecordVersion.current(),
Features.emptySupportedFeatures,
None
None,
ListenerType.ZK_BROKER
)
// Ensure that APIs needed for the internal metadata quorum (KIP-500)

Some files were not shown because too many files have changed in this diff Show More