MINOR: Allow KafkaApis to be configured for Raft controller quorums (#10045)

`KafkaApis` is configured differently when it is used in a broker with a Raft-based controller quorum vs. a ZooKeeper quorum.  For example, when using Raft, `ForwardingManager` is required rather than optional, and there is no `AdminManager`, `KafkaController`, or `KafkaZkClient`.  This PR introduces `MetadataSupport` to abstract the two possibilities: `ZkSupport` and `RaftSupport`.  This provides a fluent way to decide what to do based on the type of support that `KafkaApis` has been configured with.  Certain types of requests are not supported when using raft (`AlterIsrRequest`, `UpdateMetadataRequest`, etc.), and `MetadataSupport` gives us an intuitive way to identify the constraints and requirements associated with the different configurations and react accordingly.

Existing tests are sufficient to detect bugs and regressions.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Ron Dagostino 2021-02-05 15:57:44 -05:00 committed by GitHub
parent a3305c4b8b
commit acf39fe94a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 341 additions and 68 deletions

View File

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, ElectLeadersRequestOps, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.common.OffsetAndMetadata
import kafka.controller.{KafkaController, ReplicaAssignment}
import kafka.controller.ReplicaAssignment
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.AppendOrigin
@ -38,7 +38,6 @@ import kafka.security.authorizer.AuthorizerUtils
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.acl.{AclBinding, AclOperation}
@ -97,13 +96,10 @@ import scala.annotation.nowarn
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val replicaManager: ReplicaManager,
val adminManager: ZkAdminManager,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val forwardingManager: Option[ForwardingManager],
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
val configRepository: ConfigRepository,
@ -119,9 +115,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val brokerFeatures: BrokerFeatures,
val finalizedFeatureCache: FinalizedFeatureCache) extends ApiRequestHandler with Logging {
metadataSupport.ensureConsistentWith(config)
type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val adminZkClient = new AdminZkClient(zkClient)
val configHelper = new ConfigHelper(metadataCache, config, configRepository)
private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
@ -134,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
forwardingManager.isDefined && request.context.principalSerde.isPresent
metadataSupport.forwardingManager.isDefined && request.context.principalSerde.isPresent
}
private def maybeForwardToController(
@ -152,13 +149,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
forwardingManager match {
case Some(mgr) if !request.isForwarded && !controller.isActive =>
mgr.forwardRequest(request, responseCallback)
case _ =>
handler(request)
}
metadataSupport.maybeForward(request, handler, responseCallback)
}
/**
@ -255,6 +246,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
@ -262,11 +254,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch)) {
if (isBrokerEpochStale(zkSupport, leaderAndIsrRequest.brokerEpoch)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info("Received LeaderAndIsr request with broker epoch " +
s"${leaderAndIsrRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}")
s"${leaderAndIsrRequest.brokerEpoch} smaller than the current broker epoch ${zkSupport.controller.brokerEpoch}")
requestHelper.sendResponseExemptThrottle(request, leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception))
} else {
val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
@ -276,16 +268,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleStopReplicaRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch)) {
if (isBrokerEpochStale(zkSupport, stopReplicaRequest.brokerEpoch)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info("Received StopReplica request with broker epoch " +
s"${stopReplicaRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}")
s"${stopReplicaRequest.brokerEpoch} smaller than the current broker epoch ${zkSupport.controller.brokerEpoch}")
requestHelper.sendResponseExemptThrottle(request, new StopReplicaResponse(
new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
} else {
@ -332,15 +325,16 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleUpdateMetadataRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch)) {
if (isBrokerEpochStale(zkSupport, updateMetadataRequest.brokerEpoch)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info("Received update metadata request with broker epoch " +
s"${updateMetadataRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}")
s"${updateMetadataRequest.brokerEpoch} smaller than the current broker epoch ${zkSupport.controller.brokerEpoch}")
requestHelper.sendResponseExemptThrottle(request,
new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
} else {
@ -348,9 +342,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (deletedPartitions.nonEmpty)
groupCoordinator.handleDeletedPartitions(deletedPartitions)
if (adminManager.hasDelayedTopicOperations) {
if (zkSupport.adminManager.hasDelayedTopicOperations) {
updateMetadataRequest.partitionStates.forEach { partitionState =>
adminManager.tryCompleteDelayedTopicOperations(partitionState.topicName)
zkSupport.adminManager.tryCompleteDelayedTopicOperations(partitionState.topicName)
}
}
quotas.clientQuotaCallback.foreach { callback =>
@ -373,6 +367,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleControlledShutdownRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
@ -389,7 +384,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
requestHelper.sendResponseExemptThrottle(request, response)
}
controller.controlledShutdown(controlledShutdownRequest.data.brokerId, controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback)
zkSupport.controller.controlledShutdown(controlledShutdownRequest.data.brokerId, controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback)
}
/**
@ -462,6 +457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests"))
val responseInfo = authorizedTopicRequestInfo.map {
case (topicPartition, partitionData) =>
try {
@ -469,7 +465,7 @@ class KafkaApis(val requestChannel: RequestChannel,
&& partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
else {
zkClient.setOrCreateConsumerOffset(
zkSupport.zkClient.setOrCreateConsumerOffset(
offsetCommitRequest.data.groupId,
topicPartition,
partitionData.committedOffset)
@ -1095,8 +1091,10 @@ class KafkaApis(val requestChannel: RequestChannel,
numPartitions: Int,
replicationFactor: Int,
properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
// the below will be replaced once KAFKA-9751 is implemented
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported("Auto topic creation"))
try {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe, config.usesTopicId)
zkSupport.adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe, config.usesTopicId)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
@ -1308,6 +1306,7 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
else {
if (header.apiVersion == 0) {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests"))
val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized(
offsetFetchRequest.partitions.asScala)
@ -1317,7 +1316,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!metadataCache.contains(topicPartition))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
val payloadOpt = zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong,
@ -1751,7 +1750,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
val controllerApiVersions = metadataSupport.forwardingManager.flatMap(_.controllerApiVersions)
val apiVersionsResponse =
finalizedFeaturesOpt match {
@ -1783,6 +1782,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
def sendResponseCallback(results: CreatableTopicResultCollection): Unit = {
@ -1800,7 +1800,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val createTopicsRequest = request.body[CreateTopicsRequest]
val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
if (!controller.isActive) {
if (!zkSupport.controller.isActive) {
createTopicsRequest.data.topics.forEach { topic =>
results.add(new CreatableTopicResult().setName(topic.name)
.setErrorCode(Errors.NOT_CONTROLLER.code))
@ -1852,7 +1852,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
sendResponseCallback(results)
}
adminManager.createTopics(
zkSupport.adminManager.createTopics(
createTopicsRequest.data.timeoutMs,
createTopicsRequest.data.validateOnly,
toCreate,
@ -1863,6 +1863,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val createPartitionsRequest = request.body[CreatePartitionsRequest]
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)
@ -1884,7 +1885,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, createResponse)
}
if (!controller.isActive) {
if (!zkSupport.controller.isActive) {
val result = createPartitionsRequest.data.topics.asScala.map { topic =>
(topic.name, new ApiError(Errors.NOT_CONTROLLER, null))
}.toMap
@ -1900,14 +1901,14 @@ class KafkaApis(val requestChannel: RequestChannel,
notDuped)(_.name)
val (queuedForDeletion, valid) = authorized.partition { topic =>
controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic.name)
zkSupport.controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic.name)
}
val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
unauthorized.map(_.name -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
queuedForDeletion.map(_.name -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
adminManager.createPartitions(
zkSupport.adminManager.createPartitions(
createPartitionsRequest.data.timeoutMs,
valid,
createPartitionsRequest.data.validateOnly,
@ -1917,6 +1918,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)
def sendResponseCallback(results: DeletableTopicResultCollection): Unit = {
@ -1934,7 +1936,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val deleteTopicRequest = request.body[DeleteTopicsRequest]
val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
val toDelete = mutable.Set[String]()
if (!controller.isActive) {
if (!zkSupport.controller.isActive) {
deleteTopicRequest.topics().forEach { topic =>
results.add(new DeletableTopicResult()
.setName(topic.name())
@ -1957,7 +1959,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
results.add(new DeletableTopicResult()
.setName(name)
.setTopicId(topic.topicId()))
@ -1995,7 +1997,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(results)
}
adminManager.deleteTopics(
zkSupport.adminManager.deleteTopics(
deleteTopicRequest.data.timeoutMs,
toDelete,
controllerMutationQuota,
@ -2491,6 +2493,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleCreateAcls(request: RequestChannel.Request): Unit = {
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, ALTER)
val createAclsRequest = request.body[CreateAclsRequest]
@ -2542,6 +2545,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDeleteAcls(request: RequestChannel.Request): Unit = {
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, ALTER)
val deleteAclsRequest = request.body[DeleteAclsRequest]
authorizer match {
@ -2603,6 +2607,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterConfigsRequest = request.body[AlterConfigsRequest]
val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
resource.`type` match {
@ -2615,7 +2620,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
}
}
val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
val authorizedResult = zkSupport.adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(resource)
}
@ -2635,6 +2640,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, ALTER)
val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest]
@ -2672,10 +2678,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}.toMap
controller.alterPartitionReassignments(reassignments, sendResponseCallback)
zkSupport.controller.alterPartitionReassignments(reassignments, sendResponseCallback)
}
def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
authHelper.authorizeClusterOperation(request, DESCRIBE)
val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest]
@ -2716,7 +2723,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case _ => None
}
controller.listPartitionReassignments(partitionsOpt, sendResponseCallback)
zkSupport.controller.listPartitionReassignments(partitionsOpt, sendResponseCallback)
}
private def configsAuthorizationApiError(resource: ConfigResource): ApiError = {
@ -2729,6 +2736,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
val configs = alterConfigsRequest.data.resources.iterator.asScala.map { alterConfigResource =>
@ -2750,7 +2758,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
val authorizedResult = zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(resource)
}
@ -2834,6 +2842,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val createTokenRequest = request.body[CreateDelegationTokenRequest]
// the callback for sending a create token response
@ -2868,6 +2877,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleRenewTokenRequest(request: RequestChannel.Request): Unit = {
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val renewTokenRequest = request.body[RenewDelegationTokenRequest]
// the callback for sending a renew token response
@ -2895,6 +2905,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleExpireTokenRequest(request: RequestChannel.Request): Unit = {
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val expireTokenRequest = request.body[ExpireDelegationTokenRequest]
// the callback for sending a expire token response
@ -2967,6 +2978,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleElectReplicaLeader(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
val electionRequest = request.body[ElectLeadersRequest]
@ -3028,7 +3040,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
replicaManager.electLeaders(
controller,
zkSupport.controller,
partitions,
electionRequest.electionType,
sendResponseCallback(ApiError.NONE),
@ -3096,10 +3108,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeClientQuotasRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
if (authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter)
val result = zkSupport.adminManager.describeClientQuotas(describeClientQuotasRequest.filter)
val entriesData = result.iterator.map { case (quotaEntity, quotaValues) =>
val entityData = quotaEntity.entries.asScala.iterator.map { case (entityType, entityName) =>
@ -3130,10 +3143,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
alterClientQuotasRequest.validateOnly)
val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
@ -3160,10 +3174,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.describeUserScramCredentials(
val result = zkSupport.adminManager.describeUserScramCredentials(
Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
@ -3174,13 +3189,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterUserScramCredentialsRequest = request.body[AlterUserScramCredentialsRequest]
if (!controller.isActive) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception))
} else if (authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.alterUserScramCredentials(
val result = zkSupport.adminManager.alterUserScramCredentials(
alterUserScramCredentialsRequest.data.upsertions().asScala, alterUserScramCredentialsRequest.data.deletions().asScala)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
@ -3191,19 +3207,21 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
val alterIsrRequest = request.body[AlterIsrRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!controller.isActive)
if (!zkSupport.controller.isActive)
requestHelper.sendResponseExemptThrottle(request, alterIsrRequest.getErrorResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception))
else
controller.alterIsrs(alterIsrRequest.data, alterIsrResp =>
zkSupport.controller.alterIsrs(alterIsrRequest.data, alterIsrResp =>
requestHelper.sendResponseExemptThrottle(request, new AlterIsrResponse(alterIsrResp))
)
}
def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = {
@ -3226,12 +3244,12 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
sendResponseCallback(Left(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED)))
} else if (!controller.isActive) {
} else if (!zkSupport.controller.isActive) {
sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER)))
} else if (!config.isFeatureVersioningSupported) {
sendResponseCallback(Left(new ApiError(Errors.INVALID_REQUEST, "Feature versioning system is disabled.")))
} else {
controller.updateFeatures(updateFeaturesRequest, sendResponseCallback)
zkSupport.controller.updateFeatures(updateFeaturesRequest, sendResponseCallback)
}
}
@ -3270,6 +3288,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleEnvelope(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
val envelope = request.body[EnvelopeRequest]
// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
@ -3289,7 +3308,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
return
} else if (!controller.isActive) {
} else if (!zkSupport.controller.isActive) {
requestHelper.sendErrorResponseMaybeThrottle(request, new NotControllerException(
s"Broker $brokerId is not the active controller"))
return
@ -3446,17 +3465,16 @@ class KafkaApis(val requestChannel: RequestChannel,
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}
private def isBrokerEpochStale(brokerEpochInRequest: Long): Boolean = {
private def isBrokerEpochStale(zkSupport: ZkSupport, brokerEpochInRequest: Long): Boolean = {
// Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
brokerEpochInRequest < controller.brokerEpoch
brokerEpochInRequest < zkSupport.controller.brokerEpoch
}
}
}
object KafkaApis {
@ -3468,4 +3486,26 @@ object KafkaApis {
FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
.iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava)
}
// visible for testing
private[server] def shouldNeverReceive(request: RequestChannel.Request): Exception = {
new UnsupportedVersionException(s"Should never receive when using a Raft-based metadata quorum: ${request.header.apiKey()}")
}
// visible for testing
private[server] def shouldAlwaysForward(request: RequestChannel.Request): Exception = {
new UnsupportedVersionException(s"Should always be forwarded to the Active Controller when using a Raft-based metadata quorum: ${request.header.apiKey}")
}
private def unsupported(text: String): Exception = {
new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text")
}
private def notYetSupported(request: RequestChannel.Request): Exception = {
notYetSupported(request.header.apiKey().toString)
}
private def notYetSupported(text: String): Exception = {
new UnsupportedVersionException(s"Not yet supported when using a Raft-based metadata quorum: $text")
}
}

View File

@ -338,16 +338,17 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, forwardingManager, zkClient, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, forwardingManager, zkClient, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.controller.KafkaController
import kafka.network.RequestChannel
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.requests.AbstractResponse
sealed trait MetadataSupport {
/**
* Provide a uniform way of getting to the ForwardingManager, which is a shared concept
* despite being optional when using ZooKeeper and required when using Raft
*/
val forwardingManager: Option[ForwardingManager]
/**
* Return this instance downcast for use with ZooKeeper
*
* @param createException function to create an exception to throw
* @return this instance downcast for use with ZooKeeper
* @throws Exception if this instance is not for ZooKeeper
*/
def requireZkOrThrow(createException: => Exception): ZkSupport
/**
* Return this instance downcast for use with Raft
*
* @param createException function to create an exception to throw
* @return this instance downcast for use with Raft
* @throws Exception if this instance is not for Raft
*/
def requireRaftOrThrow(createException: => Exception): RaftSupport
/**
* Confirm that this instance is consistent with the given config
*
* @param config the config to check for consistency with this instance
* @throws IllegalStateException if there is an inconsistency (Raft for a ZooKeeper config or vice-versa)
*/
def ensureConsistentWith(config: KafkaConfig): Unit
def maybeForward(request: RequestChannel.Request,
handler: RequestChannel.Request => Unit,
responseCallback: Option[AbstractResponse] => Unit): Unit
}
case class ZkSupport(adminManager: ZkAdminManager,
controller: KafkaController,
zkClient: KafkaZkClient,
forwardingManager: Option[ForwardingManager]) extends MetadataSupport {
val adminZkClient = new AdminZkClient(zkClient)
override def requireZkOrThrow(createException: => Exception): ZkSupport = this
override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException
override def ensureConsistentWith(config: KafkaConfig): Unit = {
if (!config.requiresZookeeper) {
throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper")
}
}
override def maybeForward(request: RequestChannel.Request,
handler: RequestChannel.Request => Unit,
responseCallback: Option[AbstractResponse] => Unit): Unit = {
forwardingManager match {
case Some(mgr) if !request.isForwarded && !controller.isActive => mgr.forwardRequest(request, responseCallback)
case _ => handler(request)
}
}
}
case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
override def ensureConsistentWith(config: KafkaConfig): Unit = {
if (config.requiresZookeeper) {
throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
}
}
override def maybeForward(request: RequestChannel.Request,
handler: RequestChannel.Request => Unit,
responseCallback: Option[AbstractResponse] => Unit): Unit = {
if (!request.isForwarded) {
fwdMgr.forwardRequest(request, responseCallback)
} else {
handler(request) // will reject
}
}
}

View File

@ -121,10 +121,19 @@ class KafkaApisTest {
def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
authorizer: Option[Authorizer] = None,
enableForwarding: Boolean = false,
configRepository: ConfigRepository = new CachedConfigRepository()): KafkaApis = {
configRepository: ConfigRepository = new CachedConfigRepository(),
raftSupport: Boolean = false): KafkaApis = {
val brokerFeatures = BrokerFeatures.createDefault()
val cache = new FinalizedFeatureCache(brokerFeatures)
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
val properties = if (raftSupport) {
val properties = TestUtils.createBrokerConfig(brokerId, "")
properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
properties.put(KafkaConfig.ProcessRolesProp, "broker")
properties
} else {
TestUtils.createBrokerConfig(brokerId, "zk")
}
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
@ -134,13 +143,10 @@ class KafkaApisTest {
None
new KafkaApis(requestChannel,
if (raftSupport) RaftSupport(forwardingManager) else ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt),
replicaManager,
adminManager,
groupCoordinator,
txnCoordinator,
controller,
forwardingManagerOpt,
zkClient,
brokerId,
new KafkaConfig(properties),
configRepository,
@ -3197,4 +3203,124 @@ class KafkaApisTest {
}
private def createMockRequest(): RequestChannel.Request = {
val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request])
val requestHeader: RequestHeader = EasyMock.createNiceMock(classOf[RequestHeader])
expect(request.header).andReturn(requestHeader).anyTimes()
expect(requestHeader.apiKey()).andReturn(ApiKeys.values().head).anyTimes()
EasyMock.replay(request, requestHeader)
request
}
private def verifyShouldNeverHandle(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage)
}
private def verifyShouldAlwaysForward(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
}
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest)
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope)
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
}
@Test
def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
}
@Test
def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
}
}

View File

@ -37,6 +37,7 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
import kafka.server.ZkAdminManager;
import kafka.server.ZkSupport;
import kafka.server.metadata.CachedConfigRepository;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool;
@ -170,13 +171,10 @@ public class MetadataRequestBenchmark {
kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault();
return new KafkaApis(requestChannel,
new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty()),
replicaManager,
adminManager,
groupCoordinator,
transactionCoordinator,
kafkaController,
Option.empty(),
kafkaZkClient,
brokerId,
new KafkaConfig(kafkaProps),
new CachedConfigRepository(),