KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics (#9103)

This PR adds support for forwarding of the following RPCs:

AlterConfigs
IncrementalAlterConfigs
AlterClientQuotas
CreateTopics

Co-authored-by: Jason Gustafson <jason@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Boyang Chen 2020-11-04 14:21:44 -08:00 committed by GitHub
parent 5df8457e05
commit 0814e4f645
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
70 changed files with 2247 additions and 497 deletions

View File

@ -103,6 +103,8 @@
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.errors" />
<!-- To access DefaultPrincipalData -->
<allow pkg="org.apache.kafka.common.message" />
<subpackage name="authenticator">
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol.types" />
@ -160,6 +162,8 @@
<allow pkg="org.apache.kafka.common.record" />
<!-- for AuthorizableRequestContext interface -->
<allow pkg="org.apache.kafka.server.authorizer" />
<!-- for IncrementalAlterConfigsRequest Builder -->
<allow pkg="org.apache.kafka.clients.admin" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>

View File

@ -85,11 +85,11 @@ public final class ClientRequest {
public RequestHeader makeHeader(short version) {
short requestApiKey = requestBuilder.apiKey().id;
return new RequestHeader(
new RequestHeaderData().
setRequestApiKey(requestApiKey).
setRequestApiVersion(version).
setClientId(clientId).
setCorrelationId(correlationId),
new RequestHeaderData()
.setRequestApiKey(requestApiKey)
.setRequestApiVersion(version)
.setClientId(clientId)
.setCorrelationId(correlationId),
ApiKeys.forId(requestApiKey).requestHeaderVersion(version));
}

View File

@ -22,6 +22,6 @@ package org.apache.kafka.clients;
*/
public interface RequestCompletionHandler {
public void onComplete(ClientResponse response);
void onComplete(ClientResponse response);
}

View File

@ -122,10 +122,6 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.U
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsRequestData;
@ -2217,7 +2213,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
// and send the request to that specific broker. Other resources are grouped together into
@ -2239,9 +2235,9 @@ public class KafkaAdminClient extends AdminClient {
}
private Map<ConfigResource, KafkaFutureImpl<Void>> incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options,
Collection<ConfigResource> resources,
NodeProvider nodeProvider) {
final AlterConfigsOptions options,
Collection<ConfigResource> resources,
NodeProvider nodeProvider) {
final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>();
for (ConfigResource resource : resources)
futures.put(resource, new KafkaFutureImpl<>());
@ -2251,8 +2247,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) {
return new IncrementalAlterConfigsRequest.Builder(
toIncrementalAlterConfigsRequestData(resources, configs, options.shouldValidateOnly()));
return new IncrementalAlterConfigsRequest.Builder(resources, configs, options.shouldValidateOnly());
}
@Override
@ -2278,27 +2273,6 @@ public class KafkaAdminClient extends AdminClient {
return futures;
}
private IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection<ConfigResource> resources,
final Map<ConfigResource, Collection<AlterConfigOp>> configs,
final boolean validateOnly) {
IncrementalAlterConfigsRequestData requestData = new IncrementalAlterConfigsRequestData();
requestData.setValidateOnly(validateOnly);
for (ConfigResource resource : resources) {
AlterableConfigCollection alterableConfigSet = new AlterableConfigCollection();
for (AlterConfigOp configEntry : configs.get(resource))
alterableConfigSet.add(new AlterableConfig().
setName(configEntry.configEntry().name()).
setValue(configEntry.configEntry().value()).
setConfigOperation(configEntry.opType().id()));
AlterConfigsResource alterConfigsResource = new AlterConfigsResource();
alterConfigsResource.setResourceType(resource.type().id()).
setResourceName(resource.name()).setConfigs(alterableConfigSet);
requestData.resources().add(alterConfigsResource);
}
return requestData;
}
@Override
public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) {
final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());

View File

@ -127,7 +127,7 @@ public class ConsumerNetworkClient implements Closeable {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
requestTimeoutMs, completionHandler);
requestTimeoutMs, completionHandler);
unsent.put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request

View File

@ -469,8 +469,8 @@ public class Sender implements Runnable {
time.sleep(nextRequestHandler.retryBackoffMs());
long currentTimeMs = time.milliseconds();
ClientRequest clientRequest = client.newClientRequest(
targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs, nextRequestHandler);
ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), requestBuilder, currentTimeMs,
true, requestTimeoutMs, nextRequestHandler);
log.debug("Sending transactional request {} to node {} with correlation ID {}", requestBuilder, targetNode, clientRequest.correlationId());
client.send(clientRequest, currentTimeMs);
transactionManager.setInFlightCorrelationId(clientRequest.correlationId());

View File

@ -345,6 +345,17 @@ public class AbstractConfig {
return new RecordingMap<>(values);
}
public Map<String, ?> nonInternalValues() {
Map<String, Object> nonInternalConfigs = new RecordingMap<>();
values.forEach((key, value) -> {
ConfigDef.ConfigKey configKey = definition.configKeys().get(key);
if (configKey == null || !configKey.internalConfig) {
nonInternalConfigs.put(key, value);
}
});
return nonInternalConfigs;
}
private void logAll() {
StringBuilder b = new StringBuilder();
b.append(getClass().getSimpleName());

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Exception used to indicate a kafka principal deserialization failure during request forwarding.
*/
public class PrincipalDeserializationException extends ApiException {
private static final long serialVersionUID = 1L;
public PrincipalDeserializationException(String message) {
super(message);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import java.io.Closeable;
import java.io.IOException;
@ -51,6 +52,11 @@ public interface Authenticator extends Closeable {
*/
KafkaPrincipal principal();
/**
* Returns the serializer/deserializer interface for principal
*/
Optional<KafkaPrincipalSerde> principalSerde();
/**
* returns true if authentication is complete otherwise returns false;
*/

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.utils.Utils;
import java.io.IOException;
@ -161,6 +162,10 @@ public class KafkaChannel implements AutoCloseable {
return authenticator.principal();
}
public Optional<KafkaPrincipalSerde> principalSerde() {
return authenticator.principalSerde();
}
/**
* Does handshake of transportLayer and authentication using configured authenticator.
* For SSL with client authentication enabled, {@link TransportLayer#handshake()} performs

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@ -30,6 +31,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
public class PlaintextChannelBuilder implements ChannelBuilder {
@ -99,6 +101,11 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress, listenerName.value()));
}
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
}
@Override
public boolean complete() {
return true;

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
@ -36,6 +37,7 @@ import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
@ -201,6 +203,11 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
return principalBuilder.build(context);
}
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
}
@Override
public void close() throws IOException {
if (principalBuilder instanceof Closeable)

View File

@ -77,6 +77,8 @@ import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.EnvelopeRequestData;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FetchRequestData;
@ -185,7 +187,7 @@ public enum ApiKeys {
return parseResponse(version, buffer, (short) 0);
}
},
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS, true),
DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequestData.SCHEMAS, DeleteRecordsResponseData.SCHEMAS),
INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS),
@ -206,7 +208,7 @@ public enum ApiKeys {
DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequestData.SCHEMAS,
DescribeConfigsResponseData.SCHEMAS),
ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequestData.SCHEMAS,
AlterConfigsResponseData.SCHEMAS),
AlterConfigsResponseData.SCHEMAS, true),
ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequestData.SCHEMAS,
AlterReplicaLogDirsResponseData.SCHEMAS),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS,
@ -227,7 +229,7 @@ public enum ApiKeys {
ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
ElectLeadersResponseData.SCHEMAS),
INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
IncrementalAlterConfigsResponseData.SCHEMAS),
IncrementalAlterConfigsResponseData.SCHEMAS, true),
ALTER_PARTITION_REASSIGNMENTS(45, "AlterPartitionReassignments", AlterPartitionReassignmentsRequestData.SCHEMAS,
AlterPartitionReassignmentsResponseData.SCHEMAS),
LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
@ -236,7 +238,7 @@ public enum ApiKeys {
DESCRIBE_CLIENT_QUOTAS(48, "DescribeClientQuotas", DescribeClientQuotasRequestData.SCHEMAS,
DescribeClientQuotasResponseData.SCHEMAS),
ALTER_CLIENT_QUOTAS(49, "AlterClientQuotas", AlterClientQuotasRequestData.SCHEMAS,
AlterClientQuotasResponseData.SCHEMAS),
AlterClientQuotasResponseData.SCHEMAS, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(50, "DescribeUserScramCredentials", DescribeUserScramCredentialsRequestData.SCHEMAS,
DescribeUserScramCredentialsResponseData.SCHEMAS),
ALTER_USER_SCRAM_CREDENTIALS(51, "AlterUserScramCredentials", AlterUserScramCredentialsRequestData.SCHEMAS,
@ -251,7 +253,8 @@ public enum ApiKeys {
DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS),
UPDATE_FEATURES(57, "UpdateFeatures",
UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS);
UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS),
ENVELOPE(58, "Envelope", true, false, EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
@ -283,6 +286,9 @@ public enum ApiKeys {
/** indicates whether the API is enabled and should be exposed in ApiVersions **/
public final boolean isEnabled;
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
public final Schema[] requestSchemas;
public final Schema[] responseSchemas;
public final boolean requiresDelayedAllocation;
@ -295,13 +301,17 @@ public enum ApiKeys {
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas);
}
ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas, boolean forwardable) {
this(id, name, false, RecordBatch.MAGIC_VALUE_V0, true, requestSchemas, responseSchemas, forwardable);
}
ApiKeys(int id, String name, boolean clusterAction, boolean isEnabled, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, isEnabled, requestSchemas, responseSchemas);
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, isEnabled, requestSchemas, responseSchemas, false);
}
ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic,
Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, minRequiredInterBrokerMagic, true, requestSchemas, responseSchemas);
this(id, name, clusterAction, minRequiredInterBrokerMagic, true, requestSchemas, responseSchemas, false);
}
ApiKeys(
@ -311,7 +321,8 @@ public enum ApiKeys {
byte minRequiredInterBrokerMagic,
boolean isEnabled,
Schema[] requestSchemas,
Schema[] responseSchemas
Schema[] responseSchemas,
boolean forwardable
) {
if (id < 0)
throw new IllegalArgumentException("id must not be negative, id: " + id);
@ -332,6 +343,13 @@ public enum ApiKeys {
throw new IllegalStateException("Response schema for api " + name + " for version " + i + " is null");
}
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(requestSchemas);
this.requestSchemas = requestSchemas;
this.responseSchemas = responseSchemas;
this.forwardable = forwardable;
}
private static boolean shouldRetainsBufferReference(Schema[] requestSchemas) {
boolean requestRetainsBufferReference = false;
for (Schema requestVersionSchema : requestSchemas) {
if (retainsBufferReference(requestVersionSchema)) {
@ -339,9 +357,7 @@ public enum ApiKeys {
break;
}
}
this.requiresDelayedAllocation = requestRetainsBufferReference;
this.requestSchemas = requestSchemas;
this.responseSchemas = responseSchemas;
return requestRetainsBufferReference;
}
public static ApiKeys forId(int id) {

View File

@ -84,6 +84,7 @@ import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException;
import org.apache.kafka.common.errors.PrincipalDeserializationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.ReassignmentInProgressException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@ -338,7 +339,9 @@ public enum Errors {
INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " +
"voter-only request is not one of the expected voters", InconsistentVoterSetException::new),
INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new),
FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new);
FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new),
PRINCIPAL_DESERIALIZATION_FAILURE(97, "Request principal deserialization failed during forwarding. " +
"This indicates an internal error on the broker cluster security setup.", PrincipalDeserializationException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -261,6 +261,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return new AlterIsrRequest(new AlterIsrRequestData(struct, apiVersion), apiVersion);
case UPDATE_FEATURES:
return new UpdateFeaturesRequest(struct, apiVersion);
case ENVELOPE:
return new EnvelopeRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.network.NetworkSend;
@ -40,7 +41,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
}
/**
* Visible for testing, typically {@link #toSend(String, ResponseHeader, short)} should be used instead.
* Used for forwarding response serialization, typically {@link #toSend(String, ResponseHeader, short)}
* should be used instead.
*/
public ByteBuffer serialize(short version, ResponseHeader responseHeader) {
return RequestUtils.serialize(responseHeader.toStruct(), toStruct(version));
@ -86,6 +88,24 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
protected abstract Struct toStruct(short version);
public ByteBuffer serializeBody(short version) {
Struct dataStruct = toStruct(version);
ByteBuffer buffer = ByteBuffer.allocate(dataStruct.sizeOf());
dataStruct.writeTo(buffer);
buffer.flip();
return buffer;
}
public static AbstractResponse deserializeBody(ByteBuffer byteBuffer, RequestHeader header) {
ApiKeys apiKey = header.apiKey();
short apiVersion = header.apiVersion();
ResponseHeader.parse(byteBuffer, apiKey.responseHeaderVersion(apiVersion));
Struct struct = apiKey.parseResponse(apiVersion, byteBuffer);
return AbstractResponse.parseResponse(apiKey, struct, apiVersion);
}
public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) {
switch (apiKey) {
case PRODUCE:
@ -204,6 +224,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return new AlterIsrResponse(new AlterIsrResponseData(struct, version));
case UPDATE_FEATURES:
return new UpdateFeaturesResponse(struct, version);
case ENVELOPE:
return new EnvelopeResponse(new EnvelopeResponseData(struct, version));
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -70,13 +70,14 @@ public class AlterConfigsRequest extends AbstractRequest {
super(ApiKeys.ALTER_CONFIGS);
Objects.requireNonNull(configs, "configs");
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
AlterConfigsRequestData.AlterConfigsResource resource = new AlterConfigsRequestData.AlterConfigsResource()
AlterConfigsRequestData.AlterConfigsResource resource =
new AlterConfigsRequestData.AlterConfigsResource()
.setResourceName(entry.getKey().name())
.setResourceType(entry.getKey().type().id());
for (ConfigEntry x : entry.getValue().entries) {
resource.configs().add(new AlterConfigsRequestData.AlterableConfig()
.setName(x.name())
.setValue(x.value()));
.setName(x.name())
.setValue(x.value()));
}
this.data.resources().add(resource);
}

View File

@ -39,6 +39,10 @@ public class AlterConfigsResponse extends AbstractResponse {
this.data = new AlterConfigsResponseData(struct, version);
}
public AlterConfigsResponseData data() {
return data;
}
public Map<ConfigResource, ApiError> errors() {
return data.responses().stream().collect(Collectors.toMap(
response -> new ConfigResource(

View File

@ -51,7 +51,8 @@ public class ApiVersionsResponse extends AbstractResponse {
RecordBatch.CURRENT_MAGIC_VALUE,
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
UNKNOWN_FINALIZED_FEATURES_EPOCH
);
public final ApiVersionsResponseData data;
@ -131,33 +132,6 @@ public class ApiVersionsResponse extends AbstractResponse {
}
}
public static ApiVersionsResponse apiVersionsResponse(
int throttleTimeMs,
byte maxMagic,
Features<SupportedVersionRange> latestSupportedFeatures) {
return apiVersionsResponse(
throttleTimeMs, maxMagic, latestSupportedFeatures, Features.emptyFinalizedFeatures(), UNKNOWN_FINALIZED_FEATURES_EPOCH);
}
public static ApiVersionsResponse apiVersionsResponse(
int throttleTimeMs,
byte maxMagic,
Features<SupportedVersionRange> latestSupportedFeatures,
Features<FinalizedVersionRange> finalizedFeatures,
long finalizedFeaturesEpoch) {
if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) {
return new ApiVersionsResponse(createApiVersionsResponseData(
DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs(),
Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data().errorCode()),
DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(),
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch));
}
return createApiVersionsResponse(
throttleTimeMs, maxMagic, latestSupportedFeatures, finalizedFeatures, finalizedFeaturesEpoch);
}
public static ApiVersionsResponse createApiVersionsResponse(
final int throttleTimeMs,
final byte minMagic) {
@ -166,7 +140,8 @@ public class ApiVersionsResponse extends AbstractResponse {
minMagic,
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
UNKNOWN_FINALIZED_FEATURES_EPOCH
);
}
private static ApiVersionsResponse createApiVersionsResponse(
@ -174,28 +149,30 @@ public class ApiVersionsResponse extends AbstractResponse {
final byte minMagic,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Features<FinalizedVersionRange> finalizedFeatures,
final long finalizedFeaturesEpoch
) {
ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
apiKeys.add(new ApiVersionsResponseKey()
.setApiKey(apiKey.id)
.setMinVersion(apiKey.oldestVersion())
.setMaxVersion(apiKey.latestVersion()));
}
}
final long finalizedFeaturesEpoch) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
apiKeys,
defaultApiKeys(minMagic),
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch));
}
public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagic) {
ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
apiKeys.add(new ApiVersionsResponseKey()
.setApiKey(apiKey.id)
.setMinVersion(apiKey.oldestVersion())
.setMaxVersion(apiKey.latestVersion()));
}
}
return apiKeys;
}
public static ApiVersionsResponseData createApiVersionsResponseData(
final int throttleTimeMs,
final Errors error,

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.EnvelopeRequestData;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class EnvelopeRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<EnvelopeRequest> {
private final EnvelopeRequestData data;
public Builder(ByteBuffer requestData,
byte[] serializedPrincipal,
byte[] clientAddress) {
super(ApiKeys.ENVELOPE);
this.data = new EnvelopeRequestData()
.setRequestData(requestData)
.setRequestPrincipal(ByteBuffer.wrap(serializedPrincipal))
.setClientHostAddress(clientAddress);
}
@Override
public EnvelopeRequest build(short version) {
return new EnvelopeRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final EnvelopeRequestData data;
public EnvelopeRequest(EnvelopeRequestData data, short version) {
super(ApiKeys.ENVELOPE, version);
this.data = data;
}
public EnvelopeRequest(Struct struct, short version) {
super(ApiKeys.ENVELOPE, version);
this.data = new EnvelopeRequestData(struct, version);
}
public ByteBuffer requestData() {
return data.requestData();
}
public byte[] clientAddress() {
return data.clientHostAddress();
}
public byte[] principalData() {
byte[] serializedPrincipal = new byte[data.requestPrincipal().limit()];
data.requestPrincipal().get(serializedPrincipal);
return serializedPrincipal;
}
@Override
protected Struct toStruct() {
return data.toStruct(version());
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new EnvelopeResponse(new EnvelopeResponseData()
.setErrorCode(Errors.forException(e).code()));
}
public static EnvelopeRequest parse(ByteBuffer buffer, short version) {
return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, buffer), version);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Map;
public class EnvelopeResponse extends AbstractResponse {
private final EnvelopeResponseData data;
public EnvelopeResponse(ByteBuffer responseData,
Errors error) {
this.data = new EnvelopeResponseData()
.setResponseData(responseData)
.setErrorCode(error.code());
}
public EnvelopeResponse(Errors error) {
this(null, error);
}
public EnvelopeResponse(EnvelopeResponseData data) {
this.data = data;
}
public ByteBuffer responseData() {
return data.responseData();
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(error());
}
@Override
protected Struct toStruct(short version) {
return data.toStruct(version);
}
public Errors error() {
return Errors.forCode(data.errorCode());
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
@ -25,6 +27,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
public class IncrementalAlterConfigsRequest extends AbstractRequest {
@ -36,6 +40,32 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest {
this.data = data;
}
public Builder(final Collection<ConfigResource> resources,
final Map<ConfigResource, Collection<AlterConfigOp>> configs,
final boolean validateOnly) {
super(ApiKeys.INCREMENTAL_ALTER_CONFIGS);
this.data = new IncrementalAlterConfigsRequestData()
.setValidateOnly(validateOnly);
for (ConfigResource resource : resources) {
IncrementalAlterConfigsRequestData.AlterableConfigCollection alterableConfigSet =
new IncrementalAlterConfigsRequestData.AlterableConfigCollection();
for (AlterConfigOp configEntry : configs.get(resource))
alterableConfigSet.add(new IncrementalAlterConfigsRequestData.AlterableConfig()
.setName(configEntry.configEntry().name())
.setValue(configEntry.configEntry().value())
.setConfigOperation(configEntry.opType().id()));
IncrementalAlterConfigsRequestData.AlterConfigsResource alterConfigsResource = new IncrementalAlterConfigsRequestData.AlterConfigsResource();
alterConfigsResource.setResourceType(resource.type().id())
.setResourceName(resource.name()).setConfigs(alterableConfigSet);
data.resources().add(alterConfigsResource);
}
}
public Builder(final Map<ConfigResource, Collection<AlterConfigOp>> configs,
final boolean validateOnly) {
this(configs.keySet(), configs, validateOnly);
}
@Override
public IncrementalAlterConfigsRequest build(short version) {
return new IncrementalAlterConfigsRequest(data, version);

View File

@ -25,23 +25,28 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class IncrementalAlterConfigsResponse extends AbstractResponse {
public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
final Map<ConfigResource, ApiError> results) {
IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
responseData.setThrottleTimeMs(requestThrottleMs);
for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
responseData.responses().add(new AlterConfigsResourceResponse().
setResourceName(entry.getKey().name()).
setResourceType(entry.getKey().type().id()).
setErrorCode(entry.getValue().error().code()).
setErrorMessage(entry.getValue().message()));
}
return responseData;
public IncrementalAlterConfigsResponse(final int requestThrottleMs,
final Map<ConfigResource, ApiError> results) {
final List<AlterConfigsResourceResponse> newResults = new ArrayList<>(results.size());
results.forEach(
(resource, error) -> newResults.add(
new AlterConfigsResourceResponse()
.setErrorCode(error.error().code())
.setErrorMessage(error.message())
.setResourceName(resource.name())
.setResourceType(resource.type().id()))
);
this.data = new IncrementalAlterConfigsResponseData()
.setResponses(newResults)
.setThrottleTimeMs(requestThrottleMs);
}
public static Map<ConfigResource, ApiError> fromResponseData(final IncrementalAlterConfigsResponseData data) {

View File

@ -24,11 +24,13 @@ import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import java.util.Optional;
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;
@ -40,6 +42,8 @@ public class RequestContext implements AuthorizableRequestContext {
public final ListenerName listenerName;
public final SecurityProtocol securityProtocol;
public final ClientInformation clientInformation;
public final boolean fromPrivilegedListener;
public final Optional<KafkaPrincipalSerde> principalSerde;
public RequestContext(RequestHeader header,
String connectionId,
@ -47,7 +51,28 @@ public class RequestContext implements AuthorizableRequestContext {
KafkaPrincipal principal,
ListenerName listenerName,
SecurityProtocol securityProtocol,
ClientInformation clientInformation) {
ClientInformation clientInformation,
boolean fromPrivilegedListener) {
this(header,
connectionId,
clientAddress,
principal,
listenerName,
securityProtocol,
clientInformation,
fromPrivilegedListener,
Optional.empty());
}
public RequestContext(RequestHeader header,
String connectionId,
InetAddress clientAddress,
KafkaPrincipal principal,
ListenerName listenerName,
SecurityProtocol securityProtocol,
ClientInformation clientInformation,
boolean fromPrivilegedListener,
Optional<KafkaPrincipalSerde> principalSerde) {
this.header = header;
this.connectionId = connectionId;
this.clientAddress = clientAddress;
@ -55,6 +80,8 @@ public class RequestContext implements AuthorizableRequestContext {
this.listenerName = listenerName;
this.securityProtocol = securityProtocol;
this.clientInformation = clientInformation;
this.fromPrivilegedListener = fromPrivilegedListener;
this.principalSerde = principalSerde;
}
public RequestAndSize parseRequest(ByteBuffer buffer) {

View File

@ -36,12 +36,15 @@ public class RequestHeader implements AbstractRequestResponse {
this(new RequestHeaderData(struct, headerVersion), headerVersion);
}
public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
this(new RequestHeaderData().
setRequestApiKey(requestApiKey.id).
setRequestApiVersion(requestVersion).
setClientId(clientId).
setCorrelationId(correlationId),
public RequestHeader(ApiKeys requestApiKey,
short requestVersion,
String clientId,
int correlationId) {
this(new RequestHeaderData()
.setRequestApiKey(requestApiKey.id)
.setRequestApiVersion(requestVersion)
.setClientId(clientId)
.setCorrelationId(correlationId),
ApiKeys.forId(requestApiKey.id).requestHeaderVersion(requestVersion));
}

View File

@ -51,8 +51,13 @@ public class KafkaPrincipal implements Principal {
private volatile boolean tokenAuthenticated;
public KafkaPrincipal(String principalType, String name) {
this(principalType, name, false);
}
public KafkaPrincipal(String principalType, String name, boolean tokenAuthenticated) {
this.principalType = requireNonNull(principalType, "Principal type cannot be null");
this.name = requireNonNull(name, "Principal name cannot be null");
this.tokenAuthenticated = tokenAuthenticated;
}
/**

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.auth;
import org.apache.kafka.common.errors.SerializationException;
/**
* Serializer/Deserializer interface for {@link KafkaPrincipal} for the the purpose of inter-broker forwarding.
* Any serialization/deserialization failure should raise a {@link SerializationException} to be consistent.
*/
public interface KafkaPrincipalSerde {
/**
* Serialize a {@link KafkaPrincipal} into byte array.
*
* @param principal principal to be serialized
* @return serialized bytes
* @throws SerializationException
*/
byte[] serialize(KafkaPrincipal principal) throws SerializationException;
/**
* Deserialize a {@link KafkaPrincipal} from byte array.
* @param bytes byte array to be deserialized
* @return the deserialized principal
* @throws SerializationException
*/
KafkaPrincipal deserialize(byte[] bytes) throws SerializationException;
}

View File

@ -17,13 +17,18 @@
package org.apache.kafka.common.security.authenticator;
import javax.security.auth.x500.X500Principal;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.message.DefaultPrincipalData;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
@ -37,6 +42,7 @@ import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Principal;
import static java.util.Objects.requireNonNull;
@ -51,7 +57,7 @@ import static java.util.Objects.requireNonNull;
* of {@link KafkaPrincipalBuilder}, there is no default no-arg constructor since this class
* must adapt implementations of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder} interface.
*/
public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Closeable {
public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, KafkaPrincipalSerde, Closeable {
// Use FQN to avoid import deprecation warnings
@SuppressWarnings("deprecation")
private final org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder;
@ -161,10 +167,36 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName());
}
@Override
public byte[] serialize(KafkaPrincipal principal) {
DefaultPrincipalData data = new DefaultPrincipalData()
.setType(principal.getPrincipalType())
.setName(principal.getName())
.setTokenAuthenticated(principal.tokenAuthenticated());
Struct dataStruct = data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
ByteBuffer buffer = ByteBuffer.allocate(2 + dataStruct.sizeOf());
buffer.putShort(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
dataStruct.writeTo(buffer);
return buffer.array();
}
@Override
public KafkaPrincipal deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
short version = buffer.getShort();
if (version < 0 || version >= DefaultPrincipalData.SCHEMAS.length) {
throw new SerializationException("Invalid principal data version " + version);
}
DefaultPrincipalData data = new DefaultPrincipalData(
DefaultPrincipalData.SCHEMAS[version].read(buffer),
version);
return new KafkaPrincipal(data.type(), data.name(), data.tokenAuthenticated());
}
@Override
public void close() {
if (oldPrincipalBuilder != null)
oldPrincipalBuilder.close();
}
}

View File

@ -46,6 +46,7 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -482,6 +483,11 @@ public class SaslClientAuthenticator implements Authenticator {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName);
}
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return Optional.empty();
}
public boolean complete() {
return saslState == SaslState.COMPLETE;
}

View File

@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.kerberos.KerberosError;
@ -81,6 +82,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
public class SaslServerAuthenticator implements Authenticator {
// GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
@ -299,6 +301,11 @@ public class SaslServerAuthenticator implements Authenticator {
return principal;
}
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
}
@Override
public boolean complete() {
return saslState == SaslState.COMPLETE;
@ -410,7 +417,7 @@ public class SaslServerAuthenticator implements Authenticator {
ApiKeys apiKey = header.apiKey();
short version = header.apiVersion();
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY);
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication.");
@ -496,7 +503,7 @@ public class SaslServerAuthenticator implements Authenticator {
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY);
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);

View File

@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"type": "data",
"name": "DefaultPrincipalData",
// The encoding format for default Kafka principal in
// org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{"name": "Type", "type": "string", "versions": "0+",
"about": "The principal type"},
{"name": "Name", "type": "string", "versions": "0+",
"about": "The principal name"},
{"name": "TokenAuthenticated", "type": "bool", "versions": "0+",
"about": "Whether the principal was authenticated by a delegation token on the forwarding broker."}
]
}

View File

@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 58,
"type": "request",
"name": "EnvelopeRequest",
// Request struct for forwarding.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": true,
"about": "The embedded request header and data."},
{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", "zeroCopy": true, "nullableVersions": "0+",
"about": "Value of the initial client principal when the request is redirected by a broker." },
{ "name": "ClientHostAddress", "type": "bytes", "versions": "0+",
"about": "The original client's address in bytes." }
]
}

View File

@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 58,
"type": "response",
"name": "EnvelopeResponse",
// Response struct for forwarding.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ResponseData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
"zeroCopy": true, "default": "null",
"about": "The embedded response header and data."},
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]
}

View File

@ -38,5 +38,4 @@ public class ApiVersionsTest {
apiVersions.remove("1");
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@ -31,7 +30,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@ -43,14 +41,12 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -123,7 +119,7 @@ public class NetworkClientTest {
@Test(expected = IllegalStateException.class)
public void testSendToUnreadyNode() {
MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true);
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true);
long now = time.milliseconds();
ClientRequest request = client.newClientRequest("5", builder, now, false);
client.send(request, now);
@ -159,7 +155,7 @@ public class NetworkClientTest {
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.<TopicPartition, MemoryRecords>emptyMap());
Collections.emptyMap());
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
client.send(request, time.milliseconds());
assertEquals("There should be 1 in-flight request after send", 1,
@ -176,7 +172,7 @@ public class NetworkClientTest {
@Test
public void testUnsupportedVersionDuringInternalMetadataRequest() {
List<String> topics = Arrays.asList("topic_1");
List<String> topics = Collections.singletonList("topic_1");
// disabling auto topic creation for versions less than 4 is not supported
MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, (short) 3);
@ -194,8 +190,8 @@ public class NetworkClientTest {
Collections.emptyMap(),
null);
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = networkClient.newClientRequest(
node.idString(), builder, time.milliseconds(), true, defaultRequestTimeoutMs, handler);
ClientRequest request = networkClient.newClientRequest(node.idString(), builder, time.milliseconds(),
true, defaultRequestTimeoutMs, handler);
networkClient.send(request, time.milliseconds());
networkClient.poll(1, time.milliseconds());
assertEquals(1, networkClient.inFlightRequestCount());
@ -820,7 +816,7 @@ public class NetworkClientTest {
// metadata request when the remote node disconnects with the request in-flight.
awaitReady(client, node);
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.<String>emptyList(), true);
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
long now = time.milliseconds();
ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
client.send(request, now);
@ -864,16 +860,11 @@ public class NetworkClientTest {
assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
client.isReady(node, time.milliseconds()));
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.<String>emptyList(), true);
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
long now = time.milliseconds();
final List<ClientResponse> callbackResponses = new ArrayList<>();
RequestCompletionHandler callback = new RequestCompletionHandler() {
@Override
public void onComplete(ClientResponse response) {
callbackResponses.add(response);
}
};
RequestCompletionHandler callback = response -> callbackResponses.add(response);
ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, callback);
client.send(request1, now);
@ -944,12 +935,9 @@ public class NetworkClientTest {
private void awaitInFlightApiVersionRequest() throws Exception {
client.ready(node, time.milliseconds());
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
client.poll(0, time.milliseconds());
return client.hasInFlightRequests(node.idString());
}
TestUtils.waitForCondition(() -> {
client.poll(0, time.milliseconds());
return client.hasInFlightRequests(node.idString());
}, 1000, "");
assertFalse(client.isReady(node, time.milliseconds()));
}

View File

@ -2058,11 +2058,8 @@ public class FetcherTest {
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
ByteBuffer buffer = ApiVersionsResponse.
createApiVersionsResponse(
400,
RecordBatch.CURRENT_MAGIC_VALUE
).serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(
400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
while (!client.ready(node, time.milliseconds())) {
client.poll(1, time.milliseconds());

View File

@ -273,9 +273,7 @@ public class SenderTest {
time, true, new ApiVersions(), throttleTimeSensor, logContext);
ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(
400,
RecordBatch.CURRENT_MAGIC_VALUE
).serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
while (!client.ready(node, time.milliseconds())) {
client.poll(1, time.milliseconds());
@ -2298,8 +2296,7 @@ public class SenderTest {
InOrder inOrder = inOrder(client);
inOrder.verify(client, atLeastOnce()).ready(any(), anyLong());
inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), any(), anyLong(), anyBoolean(), anyInt(),
any());
inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), any(), anyLong(), anyBoolean(), anyInt(), any());
inOrder.verify(client, atLeastOnce()).send(any(), anyLong());
inOrder.verify(client).poll(eq(0L), anyLong());
inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong());

View File

@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
public class ProtoUtilsTest {
@Test
public void testDelayedAllocationSchemaDetection() throws Exception {
public void testDelayedAllocationSchemaDetection() {
//verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
for (ApiKeys key : ApiKeys.values()) {
switch (key) {
@ -34,10 +34,17 @@ public class ProtoUtilsTest {
case EXPIRE_DELEGATION_TOKEN:
case RENEW_DELEGATION_TOKEN:
case ALTER_USER_SCRAM_CREDENTIALS:
case ENVELOPE:
assertTrue(key + " should require delayed allocation", key.requiresDelayedAllocation);
break;
default:
assertFalse(key + " should not require delayed allocation", key.requiresDelayedAllocation);
if (key.forwardable) {
assertTrue(key + " should require delayed allocation since it is forwardable",
key.requiresDelayedAllocation);
} else {
assertFalse(key + " should not require delayed allocation",
key.requiresDelayedAllocation);
}
break;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class AbstractResponseTest {
@Test
public void testResponseSerde() {
CreateTopicsResponseData.CreatableTopicResultCollection collection =
new CreateTopicsResponseData.CreatableTopicResultCollection();
collection.add(new CreateTopicsResponseData.CreatableTopicResult()
.setTopicConfigErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
.setNumPartitions(5));
CreateTopicsResponse createTopicsResponse = new CreateTopicsResponse(
new CreateTopicsResponseData()
.setThrottleTimeMs(10)
.setTopics(collection)
);
final short version = (short) (CreateTopicsResponseData.SCHEMAS.length - 1);
final RequestHeader header = new RequestHeader(ApiKeys.CREATE_TOPICS, version, "client", 4);
final EnvelopeResponse envelopeResponse = new EnvelopeResponse(
createTopicsResponse.serialize(version, header.toResponseHeader()),
Errors.NONE
);
CreateTopicsResponse extractedResponse = (CreateTopicsResponse) CreateTopicsResponse.deserializeBody(
envelopeResponse.responseData(), header);
assertEquals(createTopicsResponse.data(), extractedResponse.data());
}
}

View File

@ -17,15 +17,8 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
import java.util.Collection;
@ -37,22 +30,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ApiVersionsResponseTest {
@Test
public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
final ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
Features.emptySupportedFeatures());
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
assertTrue(response.data.supportedFeatures().isEmpty());
assertTrue(response.data.finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data.finalizedFeaturesEpoch());
}
@Test
public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.enabledApis()));
@ -61,19 +40,6 @@ public class ApiVersionsResponseTest {
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.finalizedFeaturesEpoch());
}
@Test
public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
Features.emptySupportedFeatures());
assertEquals(new HashSet<>(ApiKeys.enabledApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
assertTrue(response.data.supportedFeatures().isEmpty());
assertTrue(response.data.finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data.finalizedFeaturesEpoch());
}
@Test
public void shouldHaveCorrectDefaultApiVersionsResponse() {
Collection<ApiVersionsResponseKey> apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys();
@ -103,37 +69,6 @@ public class ApiVersionsResponseTest {
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.finalizedFeaturesEpoch());
}
@Test
public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))),
Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange((short) 2, (short) 3)))),
10);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
assertEquals(1, response.data.supportedFeatures().size());
SupportedFeatureKey sKey = response.data.supportedFeatures().find("feature");
assertNotNull(sKey);
assertEquals(1, sKey.minVersion());
assertEquals(4, sKey.maxVersion());
assertEquals(1, response.data.finalizedFeatures().size());
FinalizedFeatureKey fKey = response.data.finalizedFeatures().find("feature");
assertNotNull(fKey);
assertEquals(2, fKey.minVersionLevel());
assertEquals(3, fKey.maxVersionLevel());
assertEquals(10, response.data.finalizedFeaturesEpoch());
}
private void verifyApiKeysForMagic(final ApiVersionsResponse response, final byte maxMagic) {
for (final ApiVersionsResponseKey version : response.data.apiKeys()) {
assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= maxMagic);
}
}
private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) {
final Set<ApiKeys> apiKeys = new HashSet<>();

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.EnvelopeRequestData;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
public class EnvelopeRequestTest {
@Test
public void testGetPrincipal() {
KafkaPrincipal kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "principal", true);
DefaultKafkaPrincipalBuilder kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null);
EnvelopeRequest.Builder requestBuilder = new EnvelopeRequest.Builder(ByteBuffer.allocate(0),
kafkaPrincipalBuilder.serialize(kafkaPrincipal), "client-address".getBytes());
EnvelopeRequest request = requestBuilder.build(EnvelopeRequestData.HIGHEST_SUPPORTED_VERSION);
assertEquals(kafkaPrincipal, kafkaPrincipalBuilder.deserialize(request.principalData()));
}
}

View File

@ -42,7 +42,7 @@ public class RequestContextTest {
RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId);
RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY);
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY, false);
assertEquals(0, context.apiVersion());
// Write some garbage to the request buffer. This should be ignored since we will treat
@ -78,5 +78,4 @@ public class RequestContextTest {
assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data.errorCode());
assertTrue(response.data.apiKeys().isEmpty());
}
}

View File

@ -212,6 +212,33 @@ public class DefaultKafkaPrincipalBuilderTest {
verify(kerberosShortNamer, atLeastOnce()).shortName(any());
}
@Test
public void testPrincipalBuilderSerde() throws Exception {
SaslServer server = mock(SaslServer.class);
KerberosShortNamer kerberosShortNamer = mock(KerberosShortNamer.class);
when(server.getMechanismName()).thenReturn(SaslConfigs.GSSAPI_MECHANISM);
when(server.getAuthorizationID()).thenReturn("foo/host@REALM.COM");
when(kerberosShortNamer.shortName(any())).thenReturn("foo");
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, null);
KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals("foo", principal.getName());
byte[] serializedPrincipal = builder.serialize(principal);
KafkaPrincipal deserializedPrincipal = builder.deserialize(serializedPrincipal);
assertEquals(principal, deserializedPrincipal);
builder.close();
verify(server, atLeastOnce()).getMechanismName();
verify(server, atLeastOnce()).getAuthorizationID();
verify(kerberosShortNamer, atLeastOnce()).shortName(any());
}
private static class DummyPrincipal implements Principal {
private final String name;

View File

@ -19,7 +19,11 @@ package kafka.api
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{RecordBatch, RecordVersion}
import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse}
import org.apache.kafka.common.requests.ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE
/**
* This class contains the different Kafka versions.
@ -135,6 +139,47 @@ object ApiVersion {
case _ => throw new IllegalArgumentException(s"Invalid message format version $recordVersion")
}
}
def apiVersionsResponse(throttleTimeMs: Int,
maxMagic: Byte,
latestSupportedFeatures: Features[SupportedVersionRange]): ApiVersionsResponse = {
apiVersionsResponse(
throttleTimeMs,
maxMagic,
latestSupportedFeatures,
Features.emptyFinalizedFeatures,
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH
)
}
def apiVersionsResponse(throttleTimeMs: Int,
maxMagic: Byte,
latestSupportedFeatures: Features[SupportedVersionRange],
finalizedFeatures: Features[FinalizedVersionRange],
finalizedFeaturesEpoch: Long): ApiVersionsResponse = {
val apiKeys = ApiVersionsResponse.defaultApiKeys(maxMagic)
if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE &&
throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME)
return new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs,
Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode),
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch)
)
new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch)
)
}
}
sealed trait ApiVersion extends Ordered[ApiVersion] {

View File

@ -80,8 +80,9 @@ object RequestChannel extends Logging {
val context: RequestContext,
val startTimeNanos: Long,
memoryPool: MemoryPool,
@volatile private var buffer: ByteBuffer,
metrics: RequestChannel.Metrics) extends BaseRequest {
@volatile var buffer: ByteBuffer,
metrics: RequestChannel.Metrics,
val envelope: Option[RequestChannel.Request] = None) extends BaseRequest {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
@volatile var requestDequeueTimeNanos = -1L
@ -94,19 +95,56 @@ object RequestChannel extends Logging {
@volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
val session = Session(context.principal, context.clientAddress)
private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
def header: RequestHeader = context.header
def sizeOfBodyInBytes: Int = bodyAndSize.size
//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
// most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
// some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
// to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
if (!header.apiKey.requiresDelayedAllocation) {
releaseBuffer()
}
def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
def isForwarded: Boolean = envelope.isDefined
def buildResponseSend(abstractResponse: AbstractResponse): Send = {
envelope match {
case Some(request) =>
val envelopeResponse = new EnvelopeResponse(
abstractResponse.serialize(header.apiVersion, header.toResponseHeader),
Errors.NONE
)
request.context.buildResponse(envelopeResponse)
case None =>
context.buildResponse(abstractResponse)
}
}
def responseString(response: AbstractResponse): Option[String] = {
if (RequestChannel.isRequestLoggingEnabled)
Some(response.toString(context.apiVersion))
else
None
}
def headerForLoggingOrThrottling(): RequestHeader = {
envelope match {
case Some(request) =>
request.context.header
case None =>
context.header
}
}
def requestDesc(details: Boolean): String = {
val forwardDescription = envelope.map { request =>
s"Forwarded request: ${request.context} "
}.getOrElse("")
s"$forwardDescription$header -- ${loggableRequest.toString(details)}"
}
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
bodyAndSize.request match {
@ -250,9 +288,14 @@ object RequestChannel extends Logging {
}
def releaseBuffer(): Unit = {
if (buffer != null) {
memoryPool.release(buffer)
buffer = null
envelope match {
case Some(request) =>
request.releaseBuffer()
case None =>
if (buffer != null) {
memoryPool.release(buffer)
buffer = null
}
}
}
@ -261,7 +304,8 @@ object RequestChannel extends Logging {
s"session=$session, " +
s"listenerName=${context.listenerName}, " +
s"securityProtocol=${context.securityProtocol}, " +
s"buffer=$buffer)"
s"buffer=$buffer, " +
s"envelope=$envelope)"
}
@ -351,7 +395,7 @@ class RequestChannel(val queueSize: Int,
def sendResponse(response: RequestChannel.Response): Unit = {
if (isTraceEnabled) {
val requestHeader = response.request.header
val requestHeader = response.request.headerForLoggingOrThrottling()
val message = response match {
case sendResponse: SendResponse =>
s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."

View File

@ -20,8 +20,7 @@ package kafka.network
import java.io.IOException
import java.net._
import java.nio.ByteBuffer
import java.nio.channels._
import java.nio.channels.{Selector => NSelector}
import java.nio.channels.{Selector => NSelector, _}
import java.util
import java.util.Optional
import java.util.concurrent._
@ -29,32 +28,31 @@ import java.util.concurrent.atomic._
import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
import kafka.security.CredentialProvider
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
import org.apache.kafka.common.network.ClientInformation
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsRequest
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientInformation, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.slf4j.event.Level
import scala.collection._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, Buffer}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.util.control.ControlThrowable
/**
@ -262,7 +260,8 @@ class SocketServer(val config: KafkaConfig,
endpointOpt.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get,
connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool, isPrivilegedListener = true)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
@ -285,8 +284,11 @@ class SocketServer(val config: KafkaConfig,
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName
for (_ <- 0 until newProcessorsPerListener) {
val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas,
listenerName, securityProtocol, memoryPool, isPrivilegedListener)
listenerProcessors += processor
dataPlaneRequestChannel.addProcessor(processor)
nextProcessorId += 1
@ -412,7 +414,7 @@ class SocketServer(val config: KafkaConfig,
// `protected` for test usage
protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
securityProtocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean): Processor = {
new Processor(id,
time,
config.socketRequestMaxBytes,
@ -427,6 +429,7 @@ class SocketServer(val config: KafkaConfig,
credentialProvider,
memoryPool,
logContext,
isPrivilegedListener = isPrivilegedListener,
allowDisabledApis = allowDisabledApis
)
}
@ -652,8 +655,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
}
/**
* Create a server socket to listen for connections on.
*/
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if (host == null || host.trim.isEmpty)
@ -727,6 +730,12 @@ private[kafka] object Processor {
/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
*
* @param isPrivilegedListener The privileged listener flag is used as one factor to determine whether
* a certain request is forwarded or not. When the control plane is defined,
* the control plane processor would be fellow broker's choice for sending
* forwarding requests; if the control plane is not defined, the processor
* relying on the inter broker listener would be acting as the privileged listener.
*/
private[kafka] class Processor(val id: Int,
time: Time,
@ -743,6 +752,7 @@ private[kafka] class Processor(val id: Int,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize,
isPrivilegedListener: Boolean = false,
allowDisabledApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
@ -772,7 +782,7 @@ private[kafka] class Processor(val id: Int,
newGauge(IdlePercentMetricName, () => {
Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, metricTags))).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
},
},
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map(NetworkProcessorMetricTag -> id.toString)
@ -961,22 +971,41 @@ private[kafka] class Processor(val id: Int,
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
// and version. It is done here to avoid wiring things up to the api layer.
if (header.apiKey == ApiKeys.API_VERSIONS) {
val apiVersionsRequest = req.body[ApiVersionsRequest]
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde)
var req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
if (req.header.apiKey == ApiKeys.ENVELOPE) {
// Override the request context with the forwarded request context.
// The envelope's context will be preserved in the forwarded context
req = parseForwardedPrincipal(req, channel.principalSerde.asScala) match {
case Some(forwardedPrincipal) =>
buildForwardedRequestContext(req, forwardedPrincipal)
case None =>
val envelopeResponse = new EnvelopeResponse(Errors.PRINCIPAL_DESERIALIZATION_FAILURE)
sendEnvelopeResponse(req, envelopeResponse)
null
}
}
requestChannel.sendRequest(req)
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
if (req != null) {
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
// and version. It is done here to avoid wiring things up to the api layer.
if (header.apiKey == ApiKeys.API_VERSIONS) {
val apiVersionsRequest = req.body[ApiVersionsRequest]
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
}
}
requestChannel.sendRequest(req)
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
}
case None =>
@ -993,6 +1022,66 @@ private[kafka] class Processor(val id: Int,
selector.clearCompletedReceives()
}
private def sendEnvelopeResponse(
envelopeRequest: RequestChannel.Request,
envelopeResponse: EnvelopeResponse
): Unit = {
val envelopResponseSend = envelopeRequest.context.buildResponse(envelopeResponse)
enqueueResponse(new RequestChannel.SendResponse(
envelopeRequest,
envelopResponseSend,
None,
None
))
}
private def parseForwardedPrincipal(
envelopeRequest: RequestChannel.Request,
principalSerde: Option[KafkaPrincipalSerde]
): Option[KafkaPrincipal] = {
val envelope = envelopeRequest.body[EnvelopeRequest]
try {
principalSerde.map { serde =>
serde.deserialize(envelope.principalData())
}
} catch {
case e: Exception =>
warn(s"Failed to deserialize principal from envelope request $envelope", e)
None
}
}
private def buildForwardedRequestContext(
envelopeRequest: RequestChannel.Request,
forwardedPrincipal: KafkaPrincipal
): RequestChannel.Request = {
val envelope = envelopeRequest.body[EnvelopeRequest]
val forwardedRequestBuffer = envelope.requestData.duplicate()
val forwardedHeader = parseRequestHeader(forwardedRequestBuffer)
val forwardedClientAddress = InetAddress.getByAddress(envelope.clientAddress)
val forwardedContext = new RequestContext(
forwardedHeader,
envelopeRequest.context.connectionId,
forwardedClientAddress,
forwardedPrincipal,
listenerName,
securityProtocol,
ClientInformation.EMPTY,
isPrivilegedListener
)
new RequestChannel.Request(
processor = id,
context = forwardedContext,
startTimeNanos = envelopeRequest.startTimeNanos,
memoryPool,
forwardedRequestBuffer,
requestChannel.metrics,
Some(envelopeRequest)
)
}
private def processCompletedSends(): Unit = {
selector.completedSends.forEach { send =>
try {
@ -1342,17 +1431,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
def recordAndGetListenerThrottleTime(minThrottleTimeMs: Int): Int = {
maxConnectionsPerListener
.get(listenerName)
.map { listenerQuota =>
val listenerThrottleTimeMs = recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)
val throttleTimeMs = math.max(minThrottleTimeMs, listenerThrottleTimeMs)
// record throttle time due to hitting connection rate quota
if (throttleTimeMs > 0) {
listenerQuota.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs)
.get(listenerName)
.map { listenerQuota =>
val listenerThrottleTimeMs = recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)
val throttleTimeMs = math.max(minThrottleTimeMs, listenerThrottleTimeMs)
// record throttle time due to hitting connection rate quota
if (throttleTimeMs > 0) {
listenerQuota.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs)
}
throttleTimeMs
}
throttleTimeMs
}
.getOrElse(0)
.getOrElse(0)
}
if (protectedListener(listenerName)) {

View File

@ -349,8 +349,8 @@ class AclAuthorizer extends Authorizer with Logging {
}
def aclsAllowAccess = {
//we allow an operation if no acls are found and user has configured to allow all users
//when no acls are found or if no deny acls are found and at least one allow acls matches.
// we allow an operation if no acls are found and user has configured to allow all users
// when no acls are found or if no deny acls are found and at least one allow acls matches.
val acls = matchingAcls(resource.resourceType, resource.name)
isEmptyAclAndAuthorized(acls) || (!denyAclExists(acls) && allowAclExists(acls))
}
@ -459,7 +459,8 @@ class AclAuthorizer extends Authorizer with Logging {
val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
val refCount = action.resourceReferenceCount
s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
s"Principal = $principal is $authResult Operation = $operation " +
s"from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
}
if (authorized) {

View File

@ -386,7 +386,7 @@ class AdminManager(val config: KafkaConfig,
resourceToConfigNames.map { case resource =>
def allConfigs(config: AbstractConfig) = {
config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala
}
def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = {

View File

@ -20,24 +20,28 @@ package kafka.server
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.Node
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse}
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
trait BrokerToControllerChannelManager {
def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: RequestCompletionHandler): Unit
def forwardRequest(request: RequestChannel.Request, responseCallback: AbstractResponse => Unit): Unit
def start(): Unit
def shutdown(): Unit
@ -55,6 +59,7 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
time: Time,
metrics: Metrics,
config: KafkaConfig,
channelName: String,
threadNamePrefix: Option[String] = None) extends BrokerToControllerChannelManager with Logging {
private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]
private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ")
@ -68,6 +73,7 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
override def shutdown(): Unit = {
requestThread.shutdown()
requestThread.awaitShutdown()
info(s"Broker to controller channel manager for $channelName shutdown")
}
private[server] def newRequestThread = {
@ -90,7 +96,7 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
Selector.NO_IDLE_TIMEOUT_MS,
metrics,
time,
"BrokerToControllerChannel",
channelName,
Map("BrokerId" -> config.brokerId.toString).asJava,
false,
channelBuilder,
@ -129,6 +135,44 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
requestQueue.put(BrokerToControllerQueueItem(request, callback))
requestThread.wakeup()
}
def forwardRequest(
request: RequestChannel.Request,
responseCallback: AbstractResponse => Unit
): Unit = {
val principalSerde = request.context.principalSerde.asScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " +
"since there is no serde defined")
)
val serializedPrincipal = principalSerde.serialize(request.context.principal)
val forwardRequestBuffer = request.buffer.duplicate()
forwardRequestBuffer.flip()
val envelopeRequest = new EnvelopeRequest.Builder(
forwardRequestBuffer,
serializedPrincipal,
request.context.clientAddress.getAddress
)
def onClientResponse(clientResponse: ClientResponse): Unit = {
val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
val envelopeError = envelopeResponse.error()
val response = if (envelopeError != Errors.NONE) {
// An envelope error indicates broker misconfiguration (e.g. the principal serde
// might not be defined on the receiving broker). In this case, we do not return
// the error directly to the client since it would not be expected. Instead we
// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
// on the broker.
debug(s"Forwarded request $request failed with an error in envelope response $envelopeError")
request.body[AbstractRequest].getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception())
} else {
AbstractResponse.deserializeBody(envelopeResponse.responseData, request.header)
}
responseCallback(response)
}
requestQueue.put(BrokerToControllerQueueItem(envelopeRequest, onClientResponse))
requestThread.wakeup()
}
}
case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: AbstractRequest],
@ -155,8 +199,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
val request = RequestAndCompletionHandler(
activeController.get,
topRequest.request,
handleResponse(topRequest),
)
handleResponse(topRequest)
)
requestsToSend.enqueue(request)
}
requestsToSend

View File

@ -335,7 +335,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
def throttle(request: RequestChannel.Request, throttleTimeMs: Int, channelThrottlingCallback: Response => Unit): Unit = {
if (throttleTimeMs > 0) {
val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
val clientSensors = getOrCreateQuotaSensors(request.session, request.headerForLoggingOrThrottling().clientId)
clientSensors.throttleTimeSensor.record(throttleTimeMs)
val throttledChannel = new ThrottledChannel(request, time, throttleTimeMs, channelThrottlingCallback)
delayQueue.add(throttledChannel)

View File

@ -49,10 +49,9 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
@ -63,7 +62,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition
import org.apache.kafka.common.message.ListOffsetResponseData
import org.apache.kafka.common.message.ListOffsetResponseData.{ListOffsetPartitionResponse, ListOffsetTopicResponse}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
@ -81,6 +79,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.server.authorizer._
@ -91,7 +90,6 @@ import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try}
import kafka.coordinator.group.GroupOverview
/**
* Logic to handle the various Kafka requests
*/
@ -101,6 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val forwardingManager: BrokerToControllerChannelManager,
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
@ -126,6 +125,55 @@ class KafkaApis(val requestChannel: RequestChannel,
info("Shutdown complete.")
}
private def maybeHandleInvalidEnvelope(
envelope: RequestChannel.Request,
forwardedApiKey: ApiKeys
): Boolean = {
def sendEnvelopeError(error: Errors): Unit = {
sendErrorResponseMaybeThrottle(envelope, error.exception)
}
if (!config.metadataQuorumEnabled || !envelope.context.fromPrivilegedListener) {
// If the designated forwarding request is not coming from a privileged listener, or
// forwarding is not enabled yet, we would not handle the request.
closeConnection(envelope, Collections.emptyMap())
true
} else if (!authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
// Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
true
} else if (!forwardedApiKey.forwardable) {
sendEnvelopeError(Errors.INVALID_REQUEST)
true
} else if (!controller.isActive) {
sendEnvelopeError(Errors.NOT_CONTROLLER)
true
} else {
false
}
}
private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
config.metadataQuorumEnabled && request.context.principalSerde.isPresent
}
private def maybeForward(
request: RequestChannel.Request,
handler: RequestChannel.Request => Unit
): Unit = {
def responseCallback(response: AbstractResponse): Unit = {
sendForwardedResponse(request, response)
}
if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) {
forwardingManager.forwardRequest(request, responseCallback)
} else {
// When the KIP-500 mode is off or the principal serde is undefined, forwarding is not supported,
// therefore requests are handled directly.
handler(request)
}
}
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
@ -133,6 +181,13 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.envelope.foreach { envelope =>
if (maybeHandleInvalidEnvelope(envelope, request.header.apiKey)) {
return
}
}
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
@ -153,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.CREATE_TOPICS => maybeForward(request, handleCreateTopicsRequest)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
@ -166,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.ALTER_CONFIGS => maybeForward(request, handleAlterConfigsRequest)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
@ -178,16 +233,19 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForward(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForward(request, handleAlterClientQuotasRequest)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentialsRequest(request)
case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case ApiKeys.ENVELOPE => throw new IllegalStateException(
"Envelope request should not be handled directly in top level API")
// Until we are ready to integrate the Raft layer, these APIs are treated as
// unexpected and we just close the connection.
case ApiKeys.VOTE => closeConnection(request, util.Collections.emptyMap())
@ -1738,13 +1796,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
finalizedFeaturesOpt match {
case Some(finalizedFeatures) => ApiVersionsResponse.apiVersionsResponse(
case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures,
finalizedFeatures.features,
finalizedFeatures.epoch)
case None => ApiVersionsResponse.apiVersionsResponse(
case None => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures)
@ -1767,7 +1825,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, createResponse)
}
val createTopicsRequest = request.body[CreateTopicsRequest]
@ -1810,7 +1868,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
errors.forKeyValue { (topicName, error) =>
errors.foreach { case (topicName, error) =>
val result = results.find(topicName)
result.setErrorCode(error.error.code)
.setErrorMessage(error.message)
@ -1853,7 +1911,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, createResponse)
}
if (!controller.isActive) {
@ -1900,7 +1958,7 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, createResponse)
}
val deleteTopicRequest = request.body[DeleteTopicsRequest]
@ -2691,14 +2749,14 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(resource)
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
new IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs,
(authorizedResult ++ unauthorizedResult).asJava)))
sendResponseMaybeThrottle(request, requestThrottleMs => new IncrementalAlterConfigsResponse(
requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
}
def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
val describeConfigsRequest = request.body[DescribeConfigsRequest]
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.toBuffer.partition { resource =>
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource =>
ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
@ -3265,19 +3323,33 @@ class KafkaApis(val requestChannel: RequestChannel,
sendErrorResponseExemptThrottle(request, e)
}
private def sendForwardedResponse(
request: RequestChannel.Request,
response: AbstractResponse
): Unit = {
// For forwarded requests, we take the throttle time from the broker that
// the request was forwarded to
val throttleTimeMs = response.throttleTimeMs()
quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
sendResponse(request, Some(response), None)
}
// Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
// response immediately.
private def sendResponseMaybeThrottle(request: RequestChannel.Request,
createResponse: Int => AbstractResponse,
onComplete: Option[Send => Unit] = None): Unit = {
createResponse: Int => AbstractResponse): Unit = {
val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
sendResponse(request, Some(createResponse(throttleTimeMs)), onComplete)
// Only throttle non-forwarded requests
if (!request.isForwarded)
quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
sendResponse(request, Some(createResponse(throttleTimeMs)), None)
}
private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
// Only throttle non-forwarded requests or cluster authorization failures
if (error.isInstanceOf[ClusterAuthorizationException] || !request.isForwarded)
quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
sendErrorOrCloseConnection(request, error, throttleTimeMs)
}
@ -3291,15 +3363,15 @@ class KafkaApis(val requestChannel: RequestChannel,
* Throttle the channel if the controller mutations quota or the request quota have been violated.
* Regardless of throttling, send the response immediately.
*/
private def sendResponseMaybeThrottle(controllerMutationQuota: ControllerMutationQuota,
request: RequestChannel.Request,
createResponse: Int => AbstractResponse,
onComplete: Option[Send => Unit]): Unit = {
private def sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota: ControllerMutationQuota,
request: RequestChannel.Request,
createResponse: Int => AbstractResponse): Unit = {
val timeMs = time.milliseconds
val controllerThrottleTimeMs = controllerMutationQuota.throttleTime
val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val maxThrottleTimeMs = Math.max(controllerThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
// Only throttle non-forwarded requests
if (maxThrottleTimeMs > 0 && !request.isForwarded) {
request.apiThrottleTimeMs = maxThrottleTimeMs
if (controllerThrottleTimeMs > requestThrottleTimeMs) {
quotas.controllerMutation.throttle(request, controllerThrottleTimeMs, requestChannel.sendResponse)
@ -3308,7 +3380,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
sendResponse(request, Some(createResponse(maxThrottleTimeMs)), onComplete)
sendResponse(request, Some(createResponse(maxThrottleTimeMs)), None)
}
private def sendResponseExemptThrottle(request: RequestChannel.Request,
@ -3352,11 +3424,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val response = responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
new RequestChannel.SendResponse(
request,
request.buildResponseSend(response),
request.responseString(response),
onComplete
)
case None =>
new RequestChannel.NoOpResponse(request)
}

View File

@ -352,6 +352,8 @@ object KafkaConfig {
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val EnableMetadataQuorumProp = "enable.metadata.quorum"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
/** ********* Socket Server Configuration ***********/
@ -1026,6 +1028,9 @@ object KafkaConfig {
.define(ConnectionSetupTimeoutMsProp, LONG, Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc)
.define(ConnectionSetupTimeoutMaxMsProp, LONG, Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
// Experimental flag to turn on APIs required for the internal metadata quorum (KIP-500)
.defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW)
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
@ -1327,6 +1332,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
if (this eq currentConfig) super.originals else currentConfig.originals
override def values: util.Map[String, _] =
if (this eq currentConfig) super.values else currentConfig.values
override def nonInternalValues: util.Map[String, _] =
if (this eq currentConfig) super.nonInternalValues else currentConfig.values
override def originalsStrings: util.Map[String, String] =
if (this eq currentConfig) super.originalsStrings else currentConfig.originalsStrings
override def originalsWithPrefix(prefix: String): util.Map[String, AnyRef] =
@ -1556,6 +1563,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion >= KAFKA_2_7_IV0
/** ********* Experimental metadata quorum configuration ***********/
def metadataQuorumEnabled = getBoolean(KafkaConfig.EnableMetadataQuorumProp)
/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)

View File

@ -35,7 +35,7 @@ import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.utils._
import kafka.zk.{BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils, CommonClientConfigs}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, CommonClientConfigs, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter, _}
@ -168,7 +168,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var kafkaController: KafkaController = null
var brokerToControllerChannelManager: BrokerToControllerChannelManager = null
var forwardingManager: BrokerToControllerChannelManager = null
var alterIsrChannelManager: BrokerToControllerChannelManager = null
var kafkaScheduler: KafkaScheduler = null
@ -299,14 +301,19 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider)
//
// Note that we allow the use of disabled APIs when experimental support for
// the internal metadata quorum has been enabled
socketServer = new SocketServer(config, metrics, time, credentialProvider,
allowDisabledApis = config.metadataQuorumEnabled)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
alterIsrChannelManager = new BrokerToControllerChannelManagerImpl(
metadataCache, time, metrics, config, "alterIsrChannel", threadNamePrefix)
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
brokerToControllerChannelManager.start()
alterIsrChannelManager.start()
val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)
@ -322,6 +329,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
if (config.metadataQuorumEnabled) {
/* start forwarding manager */
forwardingManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, "forwardingChannel", threadNamePrefix)
forwardingManager.start()
}
adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
@ -354,7 +367,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start processing requests */
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
kafkaController, forwardingManager, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
@ -362,7 +375,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
kafkaController, forwardingManager, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
@ -427,7 +440,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
val alterIsrManager = new AlterIsrManagerImpl(brokerToControllerChannelManager, kafkaScheduler,
val alterIsrManager = new AlterIsrManagerImpl(alterIsrChannelManager, kafkaScheduler,
time, config.brokerId, () => kafkaController.brokerEpoch)
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager)
@ -709,8 +722,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (brokerToControllerChannelManager != null)
CoreUtils.swallow(brokerToControllerChannelManager.shutdown(), this)
if (alterIsrChannelManager != null)
CoreUtils.swallow(alterIsrChannelManager.shutdown(), this)
if (forwardingManager != null)
CoreUtils.swallow(forwardingManager.shutdown(), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)

View File

@ -320,7 +320,7 @@ object CoreUtils {
@nowarn("cat=unused") // see below for explanation
def groupMapReduce[T, K, B](elements: Iterable[T])(key: T => K)(f: T => B)(reduce: (B, B) => B): Map[K, B] = {
// required for Scala 2.12 compatibility, unused in Scala 2.13 and hence we need to suppres the unused warning
// required for Scala 2.12 compatibility, unused in Scala 2.13 and hence we need to suppress the unused warning
import scala.collection.compat._
elements.groupMapReduce(key)(f)(reduce)
}

View File

@ -50,12 +50,18 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
for (apiKey <- enabledApis) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
assertNotNull(apiVersion)
val versionRangeStr =
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
val terminator = if (apiKey == enabledApis.last) "" else ","
val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
// Admin client should not see ENVELOPE supported versions as its a broker-internal API.
val usableVersionInfo = if (apiKey == ApiKeys.ENVELOPE) "UNSUPPORTED" else
s"$versionRangeStr [usable: $usableVersion]"
val terminator = if (apiKey == enabledApis.last) "" else ","
val line = s"\t${apiKey.name}(${apiKey.id}): $usableVersionInfo$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next())
}

View File

@ -342,7 +342,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFalse(maxMessageBytes2.isSensitive)
assertFalse(maxMessageBytes2.isReadOnly)
assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size)
assertEquals(servers(1).config.nonInternalValues.size, configs.get(brokerResource1).entries.size)
assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value)
@ -363,7 +363,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFalse(compressionType.isSensitive)
assertFalse(compressionType.isReadOnly)
assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size)
assertEquals(servers(2).config.nonInternalValues.size, configs.get(brokerResource2).entries.size)
assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
assertEquals(servers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)

View File

@ -98,6 +98,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka")
private val invalidSslProperties = invalidSslConfigs
def addExtraProps(props: Properties): Unit = {
}
@Before
override def setUp(): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
@ -123,6 +126,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString)
addExtraProps(props)
props ++= sslProperties1
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal))

View File

@ -54,7 +54,6 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]],
authorizer: Option[String] = None): Unit = {
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
brokerConfigs.foreach(p => p.setProperty("auto.leader.rebalance.enable", "false"))
authorizer match {
@ -333,11 +332,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
fail();
fail()
} catch {
case e: AdminCommandFailedException =>
assertEquals("Not authorized to perform leader election", e.getMessage)
assertTrue(e.getCause().isInstanceOf[ClusterAuthorizationException])
assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
// Check we still have the same leader
assertEquals(leader, awaitLeader(testPartition))
} finally {

View File

@ -17,9 +17,17 @@
package kafka.api
import org.apache.kafka.common.record.RecordVersion
import org.junit.Test
import java.util
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.{RecordBatch, RecordVersion}
import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse}
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.Test
import scala.jdk.CollectionConverters._
class ApiVersionTest {
@ -96,6 +104,17 @@ class ApiVersionTest {
assertEquals(KAFKA_2_4_IV1, ApiVersion("2.4"))
assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4-IV0"))
assertEquals(KAFKA_2_4_IV1, ApiVersion("2.4-IV1"))
assertEquals(KAFKA_2_5_IV0, ApiVersion("2.5"))
assertEquals(KAFKA_2_5_IV0, ApiVersion("2.5-IV0"))
assertEquals(KAFKA_2_6_IV0, ApiVersion("2.6"))
assertEquals(KAFKA_2_6_IV0, ApiVersion("2.6-IV0"))
assertEquals(KAFKA_2_7_IV2, ApiVersion("2.7"))
assertEquals(KAFKA_2_7_IV0, ApiVersion("2.7-IV0"))
assertEquals(KAFKA_2_7_IV1, ApiVersion("2.7-IV1"))
assertEquals(KAFKA_2_7_IV2, ApiVersion("2.7-IV2"))
}
@Test
@ -140,6 +159,9 @@ class ApiVersionTest {
assertEquals("2.3", KAFKA_2_3_IV0.shortVersion)
assertEquals("2.3", KAFKA_2_3_IV1.shortVersion)
assertEquals("2.4", KAFKA_2_4_IV0.shortVersion)
assertEquals("2.5", KAFKA_2_5_IV0.shortVersion)
assertEquals("2.6", KAFKA_2_6_IV0.shortVersion)
assertEquals("2.7", KAFKA_2_7_IV2.shortVersion)
}
@Test
@ -149,4 +171,90 @@ class ApiVersionTest {
assertEquals(ApiVersion.allVersions.size, apiVersions.length)
}
@Test
def shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue(): Unit = {
val response = ApiVersion.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
Features.emptySupportedFeatures
)
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
assertEquals(10, response.throttleTimeMs)
assertTrue(response.data.supportedFeatures.isEmpty)
assertTrue(response.data.finalizedFeatures.isEmpty)
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data.finalizedFeaturesEpoch)
}
@Test
def shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(): Unit = {
val response = ApiVersion.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange(1.toShort, 4.toShort)))),
Features.finalizedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange(2.toShort, 3.toShort)))),
10
)
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
assertEquals(10, response.throttleTimeMs)
assertEquals(1, response.data.supportedFeatures.size)
val sKey = response.data.supportedFeatures.find("feature")
assertNotNull(sKey)
assertEquals(1, sKey.minVersion)
assertEquals(4, sKey.maxVersion)
assertEquals(1, response.data.finalizedFeatures.size)
val fKey = response.data.finalizedFeatures.find("feature")
assertNotNull(fKey)
assertEquals(2, fKey.minVersionLevel)
assertEquals(3, fKey.maxVersionLevel)
assertEquals(10, response.data.finalizedFeaturesEpoch)
}
private def verifyApiKeysForMagic(response: ApiVersionsResponse, maxMagic: Byte): Unit = {
for (version <- response.data.apiKeys.asScala) {
assertTrue(ApiKeys.forId(version.apiKey).minRequiredInterBrokerMagic <= maxMagic)
}
}
@Test
def shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(): Unit = {
val response = ApiVersion.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
Features.emptySupportedFeatures
)
assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis), apiKeysInResponse(response))
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs)
assertTrue(response.data.supportedFeatures.isEmpty)
assertTrue(response.data.finalizedFeatures.isEmpty)
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data.finalizedFeaturesEpoch)
}
@Test
def testMetadataQuorumApisAreDisabled(): Unit = {
val response = ApiVersion.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
Features.emptySupportedFeatures
)
// Ensure that APIs needed for the internal metadata quorum (KIP-500)
// are not exposed through ApiVersions until we are ready for them
val exposedApis = apiKeysInResponse(response)
assertFalse(exposedApis.contains(ApiKeys.ENVELOPE))
assertFalse(exposedApis.contains(ApiKeys.VOTE))
assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH))
assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH))
assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM))
}
private def apiKeysInResponse(apiVersions: ApiVersionsResponse) = {
val apiKeys = new util.HashSet[ApiKeys]
for (version <- apiVersions.data.apiKeys.asScala) {
apiKeys.add(ApiKeys.forId(version.apiKey))
}
apiKeys
}
}

View File

@ -46,8 +46,9 @@ class RequestChannelTest {
val sensitiveValue = "secret"
def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = {
val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.singletonMap(resource,
new Config(entries.asJavaCollection)), true).build())
val alterConfigs = request(new AlterConfigsRequest.Builder(
Collections.singletonMap(resource, new Config(entries.asJavaCollection)), true).build())
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
@ -85,7 +86,8 @@ class RequestChannelTest {
verifyConfig(topicResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
// Verify empty request
val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.emptyMap[ConfigResource, Config], true).build())
val alterConfigs = request(new AlterConfigsRequest.Builder(
Collections.emptyMap[ConfigResource, Config], true).build())
assertEquals(Collections.emptyMap, alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
}
@ -182,7 +184,8 @@ class RequestChannelTest {
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
new ClientInformation("name", "version"))
new ClientInformation("name", "version"),
false)
}
private def toMap(config: Config): Map[String, String] = {

View File

@ -88,6 +88,8 @@ class SocketServerTest {
// Run the tests with TRACE logging to exercise request logging path
logLevelToRestore = kafkaLogger.getLevel
kafkaLogger.setLevel(Level.TRACE)
assertTrue(server.controlPlaneRequestChannelOpt.isEmpty)
}
@After
@ -302,6 +304,7 @@ class SocketServerTest {
val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config)
testableServer.startup(startProcessingRequests = false)
val updatedEndPoints = config.advertisedListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava)
@ -518,15 +521,15 @@ class SocketServerTest {
val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
credentialProvider, memoryPool, new LogContext()) {
override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
selector = testableSelector
testableSelector
credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
selector = testableSelector
testableSelector
}
}
}
@ -1018,10 +1021,10 @@ class SocketServerTest {
var conn: Socket = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
credentialProvider, MemoryPool.NONE, new LogContext()) {
credentialProvider, MemoryPool.NONE, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
conn.close()
super.sendResponse(response, responseSend)
@ -1058,15 +1061,15 @@ class SocketServerTest {
@volatile var selector: TestableSelector = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
credentialProvider, memoryPool, new LogContext()) {
credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
selector = testableSelector
testableSelector
}
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
selector = testableSelector
testableSelector
}
}
}
}
@ -1133,8 +1136,8 @@ class SocketServerTest {
assertEquals(2, server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
server.dataPlaneRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2,
s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
def requestMetricMeters = KafkaYammerMetrics
.defaultRegistry
@ -1473,6 +1476,9 @@ class SocketServerTest {
props ++= sslServerProps
val testableServer = new TestableSocketServer(time = time)
testableServer.startup()
assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)
val proxyServer = new ProxyServer(testableServer)
try {
val testableSelector = testableServer.testableSelector
@ -1602,7 +1608,7 @@ class SocketServerTest {
testableSelector.operationCounts.clear()
testableSelector.addFailure(SelectorOperation.Poll,
Some(new ControlThrowable() {}))
Some(new ControlThrowable() {}))
testableSelector.waitForOperations(SelectorOperation.Poll, 1)
testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1)
@ -1633,7 +1639,7 @@ class SocketServerTest {
if (stackTraces.isEmpty)
errors.add(s"Acceptor thread not found, threads=${Thread.getAllStackTraces.keySet}")
stackTraces.exists { case (thread, stackTrace) =>
thread.getState == Thread.State.WAITING && stackTrace.contains("ArrayBlockingQueue")
thread.getState == Thread.State.WAITING && stackTrace.contains("ArrayBlockingQueue")
}
}
@ -1670,6 +1676,75 @@ class SocketServerTest {
}
}
@Test
def testControlPlaneAsPrivilegedListener(): Unit = {
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROLLER")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
localAddr = InetAddress.getLocalHost)
val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
assertTrue(sentRequest.context.fromPrivilegedListener)
val plainSocket = connect(testableServer, localAddr = InetAddress.getLocalHost)
val plainRequest = sendAndReceiveRequest(plainSocket, testableServer)
assertFalse(plainRequest.context.fromPrivilegedListener)
})
}
@Test
def testInterBrokerListenerAsPrivilegedListener(): Unit = {
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
localAddr = InetAddress.getLocalHost)
val sentRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
assertTrue(sentRequest.context.fromPrivilegedListener)
val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
localAddr = InetAddress.getLocalHost)
val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
assertFalse(externalRequest.context.fromPrivilegedListener)
})
}
@Test
def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener(): Unit = {
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROLLER")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
localAddr = InetAddress.getLocalHost)
val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
assertTrue(controlPlaneRequest.context.fromPrivilegedListener)
val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
localAddr = InetAddress.getLocalHost)
val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
assertFalse(interBrokerRequest.context.fromPrivilegedListener)
val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
localAddr = InetAddress.getLocalHost)
val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
assertFalse(externalRequest.context.fromPrivilegedListener)
})
}
private def sslServerProps: Properties = {
val trustStoreFile = File.createTempFile("truststore", ".jks")
val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
@ -1683,7 +1758,7 @@ class SocketServerTest {
val testableServer = new TestableSocketServer(config)
testableServer.startup()
try {
testWithServer(testableServer)
testWithServer(testableServer)
} finally {
shutdownServerAndMetrics(testableServer)
assertEquals(0, testableServer.uncaughtExceptions)
@ -1739,21 +1814,21 @@ class SocketServerTest {
class TestableSocketServer(config : KafkaConfig = KafkaConfig.fromProps(props), val connectionQueueSize: Int = 20,
override val time: Time = Time.SYSTEM) extends SocketServer(config,
new Metrics, time, credentialProvider) {
new Metrics, time, credentialProvider) {
@volatile var selector: Option[TestableSelector] = None
@volatile var uncaughtExceptions = 0
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider,
memoryPool, new LogContext(), connectionQueueSize) {
memoryPool, new LogContext(), connectionQueueSize, isPrivilegedListener) {
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
selector = Some(testableSelector)
testableSelector
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
selector = Some(testableSelector)
testableSelector
}
override private[network] def processException(errorMessage: String, throwable: Throwable): Unit = {
@ -1771,11 +1846,11 @@ class SocketServerTest {
val selector = testableSelector
if (locallyClosed) {
TestUtils.waitUntilTrue(() => selector.allLocallyClosedChannels.contains(connectionId),
s"Channel not closed: $connectionId")
s"Channel not closed: $connectionId")
assertTrue("Unexpected disconnect notification", testableSelector.allDisconnectedChannels.isEmpty)
} else {
TestUtils.waitUntilTrue(() => selector.allDisconnectedChannels.contains(connectionId),
s"Disconnect notification not received: $connectionId")
s"Disconnect notification not received: $connectionId")
assertTrue("Channel closed locally", testableSelector.allLocallyClosedChannels.isEmpty)
}
val openCount = selector.allChannels.size - 1 // minus one for the channel just closed above
@ -1800,8 +1875,8 @@ class SocketServerTest {
}
class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] = mutable.Map.empty)
extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
val failures = mutable.Map[SelectorOperation, Throwable]()
val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)
@ -1902,7 +1977,7 @@ class SocketServerTest {
}
def runOp[T](operation: SelectorOperation, connectionId: Option[String],
onFailure: => Unit = {})(code: => T): T = {
onFailure: => Unit = {})(code: => T): T = {
// If a failure is set on `operation`, throw that exception even if `code` fails
try code
finally onOperation(operation, connectionId, onFailure)

View File

@ -1037,7 +1037,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short version, String clientId, int correlation
new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol, ClientInformation.EMPTY)
securityProtocol, ClientInformation.EMPTY, false)
}
private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {

View File

@ -35,7 +35,9 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
}
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse): Unit = {
assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.", ApiKeys.enabledApis().size, apiVersionsResponse.data.apiKeys().size())
val enabledPublicApis = ApiKeys.enabledApis()
assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.",
enabledPublicApis.size(), apiVersionsResponse.data.apiKeys().size())
for (expectedApiVersion: ApiVersionsResponseKey <- ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys().asScala) {
val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion)

View File

@ -69,7 +69,7 @@ class BaseClientQuotaManagerTest {
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics))
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import org.apache.kafka.common.protocol.Errors
import org.junit.Assert.assertEquals
import org.junit.Test
import scala.jdk.CollectionConverters._
class CreateTopicsRequestWithForwardingTest extends CreateTopicsRequestTest {
override def brokerPropertyOverrides(properties: Properties): Unit = {
super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
}
@Test
override def testNotController(): Unit = {
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
// With forwarding enabled, request could be forwarded to the active controller.
assertEquals(Map(Errors.NONE -> 1), response.errorCounts().asScala)
}
}

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,7 @@
package kafka.server
import java.net.InetAddress
import java.util
import java.util.concurrent.{Executors, Future, TimeUnit}
import java.util.{Collections, Optional, Properties}
@ -42,7 +43,7 @@ import org.apache.kafka.common.quota.ClientQuotaFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
@ -585,6 +586,18 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.UPDATE_FEATURES =>
new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData())
case ApiKeys.ENVELOPE =>
val requestHeader = new RequestHeader(
ApiKeys.ALTER_CLIENT_QUOTAS,
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
"client-id",
0
)
val embedRequestData = new AlterClientQuotasRequest.Builder(
List.empty.asJava, false).build().serialize(requestHeader)
new EnvelopeRequest.Builder(embedRequestData, new Array[Byte](0),
InetAddress.getByName("192.168.1.1").getAddress)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@ -721,9 +734,17 @@ object RequestQuotaTest {
}.asJava
}
}
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
class TestPrincipalBuilder extends KafkaPrincipalBuilder with KafkaPrincipalSerde {
override def build(context: AuthenticationContext): KafkaPrincipal = {
principal
}
override def serialize(principal: KafkaPrincipal): Array[Byte] = {
new Array[Byte](0)
}
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = {
principal
}
}
}

View File

@ -56,7 +56,7 @@ class ThrottledChannelExpirationTest {
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics))
}

View File

@ -96,7 +96,7 @@ public class AclAuthorizerBenchmark {
1, true, true));
context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
"someclient", 1), "1", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
}
private void setFieldValue(Object obj, String fieldName, Object value) throws Exception {

View File

@ -23,6 +23,7 @@ import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.AdminManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerToControllerChannelManager;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
@ -97,6 +98,7 @@ public class MetadataRequestBenchmark {
private AdminManager adminManager = Mockito.mock(AdminManager.class);
private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private KafkaController kafkaController = Mockito.mock(KafkaController.class);
private BrokerToControllerChannelManager brokerToControllerChannelManager = Mockito.mock(BrokerToControllerChannelManager.class);
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
@ -173,6 +175,7 @@ public class MetadataRequestBenchmark {
groupCoordinator,
transactionCoordinator,
kafkaController,
brokerToControllerChannelManager,
kafkaZkClient,
brokerId,
new KafkaConfig(kafkaProps),
@ -202,8 +205,9 @@ public class MetadataRequestBenchmark {
RequestHeader header = RequestHeader.parse(buffer);
RequestContext context = new RequestContext(header, "1", null, principal,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer, requestChannelMetrics);
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer, requestChannelMetrics, Option.empty());
}
@Benchmark