MINOR: Move a few more methods to AuthHelper (#9913)

And move some tests to `AuthHelperTest`.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Ismael Juma 2021-01-16 06:44:02 -08:00 committed by GitHub
parent bfc96efa3a
commit 6f9e73cfd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 238 additions and 223 deletions

View File

@ -19,9 +19,9 @@ package kafka.server
import java.lang.{Byte => JByte} import java.lang.{Byte => JByte}
import java.util.Collections import java.util.Collections
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.security.authorizer.AclEntry import kafka.security.authorizer.AclEntry
import kafka.utils.CoreUtils
import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.errors.ClusterAuthorizationException import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.requests.RequestContext import org.apache.kafka.common.requests.RequestContext
@ -31,9 +31,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
class AuthHelper(authorizer: Option[Authorizer]) { class AuthHelper(authorizer: Option[Authorizer]) {
def authorize(requestContext: RequestContext, def authorize(requestContext: RequestContext,
@ -70,4 +70,64 @@ class AuthHelper(authorizer: Option[Authorizer]) {
} }
Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava) Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava)
} }
def authorizeByResourceType(requestContext: RequestContext, operation: AclOperation,
resourceType: ResourceType): Boolean = {
authorizer.forall { authZ =>
authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
}
}
def partitionSeqByAuthorized[T](requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resources: Seq[T],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true)(resourceName: T => String): (Seq[T], Seq[T]) = {
authorizer match {
case Some(_) =>
val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
resources, logIfAllowed, logIfDenied)(resourceName)
resources.partition(resource => authorizedResourceNames.contains(resourceName(resource)))
case None => (resources, Seq.empty)
}
}
def partitionMapByAuthorized[K, V](requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resources: Map[K, V],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true)(resourceName: K => String): (Map[K, V], Map[K, V]) = {
authorizer match {
case Some(_) =>
val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
resources.keySet, logIfAllowed, logIfDenied)(resourceName)
resources.partition { case (k, _) => authorizedResourceNames.contains(resourceName(k)) }
case None => (resources, Map.empty)
}
}
def filterByAuthorized[T](requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resources: Iterable[T],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = {
authorizer match {
case Some(authZ) =>
val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _)
val actions = resourceNameToCount.iterator.map { case (resourceName, count) =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
}.toBuffer
authZ.authorize(requestContext, actions.asJava).asScala
.zip(resourceNameToCount.keySet)
.collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
resourceName
}.toSet
case None => resources.iterator.map(resourceName).toSet
}
}
} }

View File

@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.{AdminUtils, RackAwareMode} import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, ElectLeadersRequestOps, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0} import kafka.api.{ApiVersion, ElectLeadersRequestOps, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.controller.{KafkaController, ReplicaAssignment} import kafka.controller.{KafkaController, ReplicaAssignment}
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult} import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult}
@ -74,7 +73,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.resource.{Resource, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
@ -253,25 +252,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest] val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]): Unit = {
// for each new leader or follower, call coordinator to handle consumer group migration.
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == GROUP_METADATA_TOPIC_NAME)
groupCoordinator.onElection(partition.partitionId)
else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch)
}
updatedFollowers.foreach { partition =>
if (partition.topic == GROUP_METADATA_TOPIC_NAME)
groupCoordinator.onResignation(partition.partitionId)
else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
}
}
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch)) { if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended // When the broker restarts very quickly, it is possible for this broker to receive request intended
@ -280,7 +260,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"${leaderAndIsrRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}") s"${leaderAndIsrRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}")
requestHelper.sendResponseExemptThrottle(request, leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception)) requestHelper.sendResponseExemptThrottle(request, leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception))
} else { } else {
val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
requestHelper.sendResponseExemptThrottle(request, response) requestHelper.sendResponseExemptThrottle(request, response)
} }
} }
@ -453,7 +434,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
val topics = offsetCommitRequest.data.topics.asScala val topics = offsetCommitRequest.data.topics.asScala
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
for (topicData <- topics) { for (topicData <- topics) {
for (partitionData <- topicData.partitions.asScala) { for (partitionData <- topicData.partitions.asScala) {
val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
@ -559,7 +540,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
// cache the result to avoid redundant authorization calls // cache the result to avoid redundant authorization calls
val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
produceRequest.data().topicData().asScala)(_.name()) produceRequest.data().topicData().asScala)(_.name())
produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition => produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
@ -724,7 +705,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchContext.foreachPartition { (topicPartition, partitionData) => fetchContext.foreachPartition { (topicPartition, partitionData) =>
partitionDatas += topicPartition -> partitionData partitionDatas += topicPartition -> partitionData
} }
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic) val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
partitionDatas.foreach { case (topicPartition, data) => partitionDatas.foreach { case (topicPartition, data) =>
if (!authorizedTopics.contains(topicPartition.topic)) if (!authorizedTopics.contains(topicPartition.topic))
erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED) erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
@ -959,7 +940,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientId = request.header.clientId val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetsRequest] val offsetRequest = request.body[ListOffsetsRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
@ -1023,7 +1004,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
} }
val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
@ -1212,7 +1193,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else else
metadataRequest.topics.asScala.toSet metadataRequest.topics.asScala.toSet
val authorizedForDescribeTopics = filterByAuthorized(request.context, DESCRIBE, TOPIC, val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
topics, logIfDenied = !metadataRequest.isAllTopics)(identity) topics, logIfDenied = !metadataRequest.isAllTopics)(identity)
var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains) var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains)
var unauthorizedForCreateTopics = Set[String]() var unauthorizedForCreateTopics = Set[String]()
@ -1221,7 +1202,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
val authorizedForCreateTopics = filterByAuthorized(request.context, CREATE, TOPIC, val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
nonExistingTopics)(identity) nonExistingTopics)(identity)
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics) unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics) authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
@ -1307,7 +1288,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetFetchRequest = request.body[OffsetFetchRequest] val offsetFetchRequest = request.body[OffsetFetchRequest]
def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) = def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) =
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic) authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic)
def createResponse(requestThrottleMs: Int): AbstractResponse = { def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse = val offsetFetchResponse =
@ -1351,7 +1332,7 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetFetchRequest.getErrorResponse(requestThrottleMs, error) offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
else { else {
// clients are not allowed to see offsets for topics that are not authorized for Describe // clients are not allowed to see offsets for topics that are not authorized for Describe
val (authorizedPartitionData, _) = partitionMapByAuthorized(request.context, val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(request.context,
DESCRIBE, TOPIC, allPartitionData)(_.topic) DESCRIBE, TOPIC, allPartitionData)(_.topic)
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava) new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava)
} }
@ -1632,7 +1613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val deleteGroupsRequest = request.body[DeleteGroupsRequest] val deleteGroupsRequest = request.body[DeleteGroupsRequest]
val groups = deleteGroupsRequest.data.groupsNames.asScala.distinct val groups = deleteGroupsRequest.data.groupsNames.asScala.distinct
val (authorizedGroups, unauthorizedGroups) = partitionSeqByAuthorized(request.context, DELETE, GROUP, val (authorizedGroups, unauthorizedGroups) = authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP,
groups)(identity) groups)(identity)
val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet) ++ val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet) ++
@ -1827,8 +1808,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val topics = createTopicsRequest.data.topics.asScala.map(_.name) val topics = createTopicsRequest.data.topics.asScala.map(_.name)
val authorizedTopics = val authorizedTopics =
if (hasClusterAuthorization) topics.toSet if (hasClusterAuthorization) topics.toSet
else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) else authHelper.filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, val authorizedForDescribeConfigs = authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
results.forEach { topic => results.forEach { topic =>
@ -1908,7 +1889,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.filter { _._2.size > 1 } .filter { _._2.size > 1 }
.keySet .keySet
val notDuped = topics.filterNot(topic => dupes.contains(topic.name)) val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
val (authorized, unauthorized) = partitionSeqByAuthorized(request.context, ALTER, TOPIC, val (authorized, unauthorized) = authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC,
notDuped)(_.name) notDuped)(_.name)
val (queuedForDeletion, valid) = authorized.partition { topic => val (queuedForDeletion, valid) = authorized.partition { topic =>
@ -1966,7 +1947,7 @@ class KafkaApis(val requestChannel: RequestChannel,
results.add(new DeletableTopicResult() results.add(new DeletableTopicResult()
.setName(topic)) .setName(topic))
} }
val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala)(_.name) results.asScala)(_.name)
results.forEach { topic => results.forEach { topic =>
if (!authorizedTopics.contains(topic.name)) if (!authorizedTopics.contains(topic.name))
@ -2007,7 +1988,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]() val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
val topics = deleteRecordsRequest.data.topics.asScala val topics = deleteRecordsRequest.data.topics.asScala
val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, topics)(_.name) val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, topics)(_.name)
val deleteTopicPartitions = topics.flatMap { deleteTopic => val deleteTopicPartitions = topics.flatMap { deleteTopic =>
deleteTopic.partitions.asScala.map { deletePartition => deleteTopic.partitions.asScala.map { deletePartition =>
new TopicPartition(deleteTopic.name, deletePartition.partitionIndex) -> deletePartition.offset new TopicPartition(deleteTopic.name, deletePartition.partitionIndex) -> deletePartition.offset
@ -2076,7 +2057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
return return
} }
} else if (!authHelper.authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false) } else if (!authHelper.authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false)
&& !authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) { && !authHelper.authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return return
} }
@ -2279,7 +2260,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
val authorizedPartitions = mutable.Set[TopicPartition]() val authorizedPartitions = mutable.Set[TopicPartition]()
val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
for (topicPartition <- partitionsToAdd) { for (topicPartition <- partitionsToAdd) {
if (!authorizedTopics.contains(topicPartition.topic)) if (!authorizedTopics.contains(topicPartition.topic))
@ -2394,7 +2375,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
val committedOffsets = txnOffsetCommitRequest.offsets.asScala val committedOffsets = txnOffsetCommitRequest.offsets.asScala
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic) val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic)
for ((topicPartition, commitedOffset) <- committedOffsets) { for ((topicPartition, commitedOffset) <- committedOffsets) {
if (!authorizedTopics.contains(topicPartition.topic)) if (!authorizedTopics.contains(topicPartition.topic))
@ -2571,7 +2552,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedTopics, unauthorizedTopics) = val (authorizedTopics, unauthorizedTopics) =
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false))
(topics, Seq.empty[OffsetForLeaderTopic]) (topics, Seq.empty[OffsetForLeaderTopic])
else partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic) else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic)
val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedTopics) val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedTopics)
val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { offsetForLeaderTopic => val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { offsetForLeaderTopic =>
@ -3037,7 +3018,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) { if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
val topics = offsetDeleteRequest.data.topics.asScala val topics = offsetDeleteRequest.data.topics.asScala
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
val topicPartitionErrors = mutable.Map[TopicPartition, Errors]() val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
val topicPartitions = mutable.ArrayBuffer[TopicPartition]() val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
@ -3245,7 +3226,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"${config.interBrokerListenerName}.") s"${config.interBrokerListenerName}.")
requestHelper.closeConnection(request, Collections.emptyMap()) requestHelper.closeConnection(request, Collections.emptyMap())
return return
} else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { } else if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
return return
@ -3350,82 +3331,6 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
// private package for testing
private[server] def authorize(requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resourceName: String,
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true,
refCount: Int = 1): Boolean = {
authorizer.forall { authZ =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
}
}
private def authorizeByResourceType(requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType): Boolean = {
authorizer.forall { authZ =>
authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
}
}
// private package for testing
private[server] def filterByAuthorized[T](requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resources: Iterable[T],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = {
authorizer match {
case Some(authZ) =>
val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _)
val actions = resourceNameToCount.iterator.map { case (resourceName, count) =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
}.toBuffer
authZ.authorize(requestContext, actions.asJava).asScala
.zip(resourceNameToCount.keySet)
.collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
resourceName
}.toSet
case None => resources.iterator.map(resourceName).toSet
}
}
private def partitionSeqByAuthorized[T](requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resources: Seq[T],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true)(resourceName: T => String): (Seq[T], Seq[T]) = {
authorizer match {
case Some(_) =>
val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
resources, logIfAllowed, logIfDenied)(resourceName)
resources.partition(resource => authorizedResourceNames.contains(resourceName(resource)))
case None => (resources, Seq.empty)
}
}
private def partitionMapByAuthorized[K, V](requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resources: Map[K, V],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true)(resourceName: K => String): (Map[K, V], Map[K, V]) = {
authorizer match {
case Some(_) =>
val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
resources.keySet, logIfAllowed, logIfDenied)(resourceName)
resources.partition { case (k, _) => authorizedResourceNames.contains(resourceName(k)) }
case None => (resources, Map.empty)
}
}
private def updateRecordConversionStats(request: RequestChannel.Request, private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition, tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = { conversionStats: RecordConversionStats): Unit = {

View File

@ -23,10 +23,9 @@ import java.nio.channels._
import java.util.concurrent.locks.{Lock, ReadWriteLock} import java.util.concurrent.locks.{Lock, ReadWriteLock}
import java.lang.management._ import java.lang.management._
import java.util.{Base64, Properties, UUID} import java.util.{Base64, Properties, UUID}
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import javax.management._
import javax.management._
import scala.collection._ import scala.collection._
import scala.collection.{Seq, mutable} import scala.collection.{Seq, mutable}
import kafka.cluster.EndPoint import kafka.cluster.EndPoint

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.net.InetAddress
import java.util
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.easymock.EasyMock._
import org.easymock.{EasyMock, IArgumentMatcher}
import org.junit.Assert._
import org.junit.Test
import scala.collection.Seq
import scala.jdk.CollectionConverters._
class AuthHelperTest {
import AuthHelperTest._
private val clientId = ""
@Test
def testAuthorize(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
val resourceName = "topic-1"
val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, clientId, 0)
val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
1, true, true)
)
EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava))
.andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
.once()
EasyMock.replay(authorizer)
val result = new AuthHelper(Some(authorizer)).authorize(
requestContext, operation, resourceType, resourceName)
verify(authorizer)
assertEquals(true, result)
}
@Test
def testFilterByAuthorized(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
val resourceName1 = "topic-1"
val resourceName2 = "topic-2"
val resourceName3 = "topic-3"
val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
clientId, 0)
val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
2, true, true),
new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL),
1, true, true),
new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL),
1, true, true),
)
EasyMock.expect(authorizer.authorize(
EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava)
)).andAnswer { () =>
val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}.once()
EasyMock.replay(authorizer)
val result = new AuthHelper(Some(authorizer)).filterByAuthorized(
requestContext,
operation,
resourceType,
// Duplicate resource names should not trigger multiple calls to authorize
Seq(resourceName1, resourceName2, resourceName1, resourceName3)
)(identity)
verify(authorizer)
assertEquals(Set(resourceName1, resourceName3), result)
}
}
object AuthHelperTest {
/**
* Similar to `EasyMock.eq`, but matches if both lists have the same elements irrespective of ordering.
*/
def matchSameElements[T](list: java.util.List[T]): java.util.List[T] = {
EasyMock.reportMatcher(new IArgumentMatcher {
def matches(argument: Any): Boolean = argument match {
case l: java.util.List[_] => list.asScala.toSet == l.asScala.toSet
case _ => false
}
def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)")
})
null
}
}

View File

@ -69,7 +69,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.easymock.EasyMock._ import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher} import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Test} import org.junit.{After, Test}
import org.mockito.{ArgumentMatchers, Mockito} import org.mockito.{ArgumentMatchers, Mockito}
@ -148,103 +148,6 @@ class KafkaApisTest {
cache) cache)
} }
@Test
def testAuthorize(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
val resourceName = "topic-1"
val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
clientId, 0)
val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
1, true, true)
)
EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava))
.andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
.once()
EasyMock.replay(authorizer)
val result = createKafkaApis(authorizer = Some(authorizer)).authHelper.authorize(
requestContext, operation, resourceType, resourceName)
verify(authorizer)
assertEquals(true, result)
}
@Test
def testFilterByAuthorized(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
val resourceName1 = "topic-1"
val resourceName2 = "topic-2"
val resourceName3 = "topic-3"
val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
clientId, 0)
val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
2, true, true),
new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL),
1, true, true),
new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL),
1, true, true),
)
EasyMock.expect(authorizer.authorize(
EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava)
)).andAnswer { () =>
val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}.once()
EasyMock.replay(authorizer)
val result = createKafkaApis(authorizer = Some(authorizer)).filterByAuthorized(
requestContext,
operation,
resourceType,
// Duplicate resource names should not trigger multiple calls to authorize
Seq(resourceName1, resourceName2, resourceName1, resourceName3)
)(identity)
verify(authorizer)
assertEquals(Set(resourceName1, resourceName3), result)
}
/**
* Returns true if the elements in both lists are the same irrespective of ordering.
*/
private def matchSameElements[T](list: util.List[T]): util.List[T] = {
EasyMock.reportMatcher(new IArgumentMatcher {
def matches(argument: Any): Boolean = argument match {
case s: util.List[_] => s.asScala.toSet == list.asScala.toSet
case _ => false
}
def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)")
})
null
}
@Test @Test
def testDescribeConfigsWithAuthorizer(): Unit = { def testDescribeConfigsWithAuthorizer(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
@ -868,7 +771,7 @@ class KafkaApisTest {
1, logIfAllowed, logIfDenied)) 1, logIfAllowed, logIfDenied))
EasyMock.expect(authorizer.authorize( EasyMock.expect(authorizer.authorize(
anyObject[RequestContext], matchSameElements(expectedAuthorizedActions.asJava) anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedAuthorizedActions.asJava)
)).andAnswer { () => )).andAnswer { () =>
val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
actions.map { action => actions.map { action =>

View File

@ -130,7 +130,6 @@ object TestUtils extends Logging {
f f
} }
/** /**
* Create a temporary file * Create a temporary file
*/ */

View File

@ -192,6 +192,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/> <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match> </Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="AuthHelper.scala"/>
<Package name="kafka.server"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match> <Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 --> <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="KafkaApis.scala"/> <Source name="KafkaApis.scala"/>