From 6f9e73cfd87d551a27fbd6d68ce2a0cf94e857b1 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 16 Jan 2021 06:44:02 -0800 Subject: [PATCH] MINOR: Move a few more methods to AuthHelper (#9913) And move some tests to `AuthHelperTest`. Reviewers: David Arthur --- .../main/scala/kafka/server/AuthHelper.scala | 64 +++++++- .../main/scala/kafka/server/KafkaApis.scala | 143 +++--------------- .../main/scala/kafka/utils/CoreUtils.scala | 3 +- .../unit/kafka/server/AuthHelperTest.scala | 142 +++++++++++++++++ .../unit/kafka/server/KafkaApisTest.scala | 101 +------------ .../scala/unit/kafka/utils/TestUtils.scala | 1 - gradle/spotbugs-exclude.xml | 7 + 7 files changed, 238 insertions(+), 223 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/server/AuthHelperTest.scala diff --git a/core/src/main/scala/kafka/server/AuthHelper.scala b/core/src/main/scala/kafka/server/AuthHelper.scala index d68fe17e227..50a13510ac0 100644 --- a/core/src/main/scala/kafka/server/AuthHelper.scala +++ b/core/src/main/scala/kafka/server/AuthHelper.scala @@ -19,9 +19,9 @@ package kafka.server import java.lang.{Byte => JByte} import java.util.Collections - import kafka.network.RequestChannel import kafka.security.authorizer.AclEntry +import kafka.utils.CoreUtils import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.errors.ClusterAuthorizationException import org.apache.kafka.common.requests.RequestContext @@ -31,9 +31,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} +import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ - class AuthHelper(authorizer: Option[Authorizer]) { def authorize(requestContext: RequestContext, @@ -70,4 +70,64 @@ class AuthHelper(authorizer: Option[Authorizer]) { } Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava) } + + def authorizeByResourceType(requestContext: RequestContext, operation: AclOperation, + resourceType: ResourceType): Boolean = { + authorizer.forall { authZ => + authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + } + + def partitionSeqByAuthorized[T](requestContext: RequestContext, + operation: AclOperation, + resourceType: ResourceType, + resources: Seq[T], + logIfAllowed: Boolean = true, + logIfDenied: Boolean = true)(resourceName: T => String): (Seq[T], Seq[T]) = { + authorizer match { + case Some(_) => + val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType, + resources, logIfAllowed, logIfDenied)(resourceName) + resources.partition(resource => authorizedResourceNames.contains(resourceName(resource))) + case None => (resources, Seq.empty) + } + } + + def partitionMapByAuthorized[K, V](requestContext: RequestContext, + operation: AclOperation, + resourceType: ResourceType, + resources: Map[K, V], + logIfAllowed: Boolean = true, + logIfDenied: Boolean = true)(resourceName: K => String): (Map[K, V], Map[K, V]) = { + authorizer match { + case Some(_) => + val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType, + resources.keySet, logIfAllowed, logIfDenied)(resourceName) + resources.partition { case (k, _) => authorizedResourceNames.contains(resourceName(k)) } + case None => (resources, Map.empty) + } + } + + def filterByAuthorized[T](requestContext: RequestContext, + operation: AclOperation, + resourceType: ResourceType, + resources: Iterable[T], + logIfAllowed: Boolean = true, + logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = { + authorizer match { + case Some(authZ) => + val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _) + val actions = resourceNameToCount.iterator.map { case (resourceName, count) => + val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) + new Action(operation, resource, count, logIfAllowed, logIfDenied) + }.toBuffer + authZ.authorize(requestContext, actions.asJava).asScala + .zip(resourceNameToCount.keySet) + .collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED => + resourceName + }.toSet + case None => resources.iterator.map(resourceName).toSet + } + } + } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1b5c4d078a8..41f33225491 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api.{ApiVersion, ElectLeadersRequestOps, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0} -import kafka.cluster.Partition import kafka.common.OffsetAndMetadata import kafka.controller.{KafkaController, ReplicaAssignment} import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult} @@ -74,7 +73,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType._ -import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} +import org.apache.kafka.common.resource.{Resource, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time} @@ -253,25 +252,6 @@ class KafkaApis(val requestChannel: RequestChannel, val correlationId = request.header.correlationId val leaderAndIsrRequest = request.body[LeaderAndIsrRequest] - def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]): Unit = { - // for each new leader or follower, call coordinator to handle consumer group migration. - // this callback is invoked under the replica state change lock to ensure proper order of - // leadership changes - updatedLeaders.foreach { partition => - if (partition.topic == GROUP_METADATA_TOPIC_NAME) - groupCoordinator.onElection(partition.partitionId) - else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) - txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch) - } - - updatedFollowers.foreach { partition => - if (partition.topic == GROUP_METADATA_TOPIC_NAME) - groupCoordinator.onResignation(partition.partitionId) - else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) - txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch)) - } - } - authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch)) { // When the broker restarts very quickly, it is possible for this broker to receive request intended @@ -280,7 +260,8 @@ class KafkaApis(val requestChannel: RequestChannel, s"${leaderAndIsrRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}") requestHelper.sendResponseExemptThrottle(request, leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception)) } else { - val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) + val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, + RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _)) requestHelper.sendResponseExemptThrottle(request, response) } } @@ -453,7 +434,7 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] val topics = offsetCommitRequest.data.topics.asScala - val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) + val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) for (topicData <- topics) { for (partitionData <- topicData.partitions.asScala) { val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) @@ -559,7 +540,7 @@ class KafkaApis(val requestChannel: RequestChannel, val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() // cache the result to avoid redundant authorization calls - val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, + val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, produceRequest.data().topicData().asScala)(_.name()) produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition => @@ -724,7 +705,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchContext.foreachPartition { (topicPartition, partitionData) => partitionDatas += topicPartition -> partitionData } - val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic) + val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic) partitionDatas.foreach { case (topicPartition, data) => if (!authorizedTopics.contains(topicPartition.topic)) erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED) @@ -959,7 +940,7 @@ class KafkaApis(val requestChannel: RequestChannel, val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetsRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => @@ -1023,7 +1004,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) } - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => @@ -1212,7 +1193,7 @@ class KafkaApis(val requestChannel: RequestChannel, else metadataRequest.topics.asScala.toSet - val authorizedForDescribeTopics = filterByAuthorized(request.context, DESCRIBE, TOPIC, + val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, topics, logIfDenied = !metadataRequest.isAllTopics)(identity) var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains) var unauthorizedForCreateTopics = Set[String]() @@ -1221,7 +1202,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { - val authorizedForCreateTopics = filterByAuthorized(request.context, CREATE, TOPIC, + val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC, nonExistingTopics)(identity) unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics) authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics) @@ -1307,7 +1288,7 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetFetchRequest = request.body[OffsetFetchRequest] def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) = - partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic) + authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic) def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = @@ -1351,7 +1332,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchRequest.getErrorResponse(requestThrottleMs, error) else { // clients are not allowed to see offsets for topics that are not authorized for Describe - val (authorizedPartitionData, _) = partitionMapByAuthorized(request.context, + val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(request.context, DESCRIBE, TOPIC, allPartitionData)(_.topic) new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava) } @@ -1632,7 +1613,7 @@ class KafkaApis(val requestChannel: RequestChannel, val deleteGroupsRequest = request.body[DeleteGroupsRequest] val groups = deleteGroupsRequest.data.groupsNames.asScala.distinct - val (authorizedGroups, unauthorizedGroups) = partitionSeqByAuthorized(request.context, DELETE, GROUP, + val (authorizedGroups, unauthorizedGroups) = authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP, groups)(identity) val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet) ++ @@ -1827,8 +1808,8 @@ class KafkaApis(val requestChannel: RequestChannel, val topics = createTopicsRequest.data.topics.asScala.map(_.name) val authorizedTopics = if (hasClusterAuthorization) topics.toSet - else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) - val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, + else authHelper.filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) + val authorizedForDescribeConfigs = authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap results.forEach { topic => @@ -1908,7 +1889,7 @@ class KafkaApis(val requestChannel: RequestChannel, .filter { _._2.size > 1 } .keySet val notDuped = topics.filterNot(topic => dupes.contains(topic.name)) - val (authorized, unauthorized) = partitionSeqByAuthorized(request.context, ALTER, TOPIC, + val (authorized, unauthorized) = authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC, notDuped)(_.name) val (queuedForDeletion, valid) = authorized.partition { topic => @@ -1966,7 +1947,7 @@ class KafkaApis(val requestChannel: RequestChannel, results.add(new DeletableTopicResult() .setName(topic)) } - val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, + val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => if (!authorizedTopics.contains(topic.name)) @@ -2007,7 +1988,7 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]() val topics = deleteRecordsRequest.data.topics.asScala - val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, topics)(_.name) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, topics)(_.name) val deleteTopicPartitions = topics.flatMap { deleteTopic => deleteTopic.partitions.asScala.map { deletePartition => new TopicPartition(deleteTopic.name, deletePartition.partitionIndex) -> deletePartition.offset @@ -2076,7 +2057,7 @@ class KafkaApis(val requestChannel: RequestChannel, return } } else if (!authHelper.authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false) - && !authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) { + && !authHelper.authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) { requestHelper.sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } @@ -2279,7 +2260,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val authorizedPartitions = mutable.Set[TopicPartition]() - val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, + val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) for (topicPartition <- partitionsToAdd) { if (!authorizedTopics.contains(topicPartition.topic)) @@ -2394,7 +2375,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() val committedOffsets = txnOffsetCommitRequest.offsets.asScala - val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic) + val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic) for ((topicPartition, commitedOffset) <- committedOffsets) { if (!authorizedTopics.contains(topicPartition.topic)) @@ -2571,7 +2552,7 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedTopics, unauthorizedTopics) = if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) (topics, Seq.empty[OffsetForLeaderTopic]) - else partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic) + else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic) val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedTopics) val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { offsetForLeaderTopic => @@ -3037,7 +3018,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) { val topics = offsetDeleteRequest.data.topics.asScala - val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) + val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) val topicPartitionErrors = mutable.Map[TopicPartition, Errors]() val topicPartitions = mutable.ArrayBuffer[TopicPartition]() @@ -3245,7 +3226,7 @@ class KafkaApis(val requestChannel: RequestChannel, s"${config.interBrokerListenerName}.") requestHelper.closeConnection(request, Collections.emptyMap()) return - } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + } else if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) return @@ -3350,82 +3331,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // private package for testing - private[server] def authorize(requestContext: RequestContext, - operation: AclOperation, - resourceType: ResourceType, - resourceName: String, - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true, - refCount: Int = 1): Boolean = { - authorizer.forall { authZ => - val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) - val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied)) - authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED - } - } - - private def authorizeByResourceType(requestContext: RequestContext, - operation: AclOperation, - resourceType: ResourceType): Boolean = { - authorizer.forall { authZ => - authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED - } - } - - // private package for testing - private[server] def filterByAuthorized[T](requestContext: RequestContext, - operation: AclOperation, - resourceType: ResourceType, - resources: Iterable[T], - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = { - authorizer match { - case Some(authZ) => - val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _) - val actions = resourceNameToCount.iterator.map { case (resourceName, count) => - val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) - new Action(operation, resource, count, logIfAllowed, logIfDenied) - }.toBuffer - authZ.authorize(requestContext, actions.asJava).asScala - .zip(resourceNameToCount.keySet) - .collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED => - resourceName - }.toSet - case None => resources.iterator.map(resourceName).toSet - } - } - - private def partitionSeqByAuthorized[T](requestContext: RequestContext, - operation: AclOperation, - resourceType: ResourceType, - resources: Seq[T], - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true)(resourceName: T => String): (Seq[T], Seq[T]) = { - authorizer match { - case Some(_) => - val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType, - resources, logIfAllowed, logIfDenied)(resourceName) - resources.partition(resource => authorizedResourceNames.contains(resourceName(resource))) - case None => (resources, Seq.empty) - } - } - - private def partitionMapByAuthorized[K, V](requestContext: RequestContext, - operation: AclOperation, - resourceType: ResourceType, - resources: Map[K, V], - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true)(resourceName: K => String): (Map[K, V], Map[K, V]) = { - authorizer match { - case Some(_) => - val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType, - resources.keySet, logIfAllowed, logIfDenied)(resourceName) - resources.partition { case (k, _) => authorizedResourceNames.contains(resourceName(k)) } - case None => (resources, Map.empty) - } - } - private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordConversionStats): Unit = { diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index ff58a871881..96a71c28518 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -23,10 +23,9 @@ import java.nio.channels._ import java.util.concurrent.locks.{Lock, ReadWriteLock} import java.lang.management._ import java.util.{Base64, Properties, UUID} - import com.typesafe.scalalogging.Logger -import javax.management._ +import javax.management._ import scala.collection._ import scala.collection.{Seq, mutable} import kafka.cluster.EndPoint diff --git a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala new file mode 100644 index 00000000000..c48b051cde9 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.net.InetAddress +import java.util +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} +import org.easymock.EasyMock._ +import org.easymock.{EasyMock, IArgumentMatcher} +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.Seq +import scala.jdk.CollectionConverters._ + +class AuthHelperTest { + import AuthHelperTest._ + + private val clientId = "" + + @Test + def testAuthorize(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + val operation = AclOperation.WRITE + val resourceType = ResourceType.TOPIC + val resourceName = "topic-1" + val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, clientId, 0) + val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, + KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false) + + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL), + 1, true, true) + ) + + EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava)) + .andReturn(Seq(AuthorizationResult.ALLOWED).asJava) + .once() + + EasyMock.replay(authorizer) + + val result = new AuthHelper(Some(authorizer)).authorize( + requestContext, operation, resourceType, resourceName) + + verify(authorizer) + + assertEquals(true, result) + } + + @Test + def testFilterByAuthorized(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + val operation = AclOperation.WRITE + val resourceType = ResourceType.TOPIC + val resourceName1 = "topic-1" + val resourceName2 = "topic-2" + val resourceName3 = "topic-3" + val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, + clientId, 0) + val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, + KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false) + + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL), + 2, true, true), + new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL), + 1, true, true), + new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL), + 1, true, true), + ) + + EasyMock.expect(authorizer.authorize( + EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava) + )).andAnswer { () => + val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala + actions.map { action => + if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name)) + AuthorizationResult.ALLOWED + else + AuthorizationResult.DENIED + }.asJava + }.once() + + EasyMock.replay(authorizer) + + val result = new AuthHelper(Some(authorizer)).filterByAuthorized( + requestContext, + operation, + resourceType, + // Duplicate resource names should not trigger multiple calls to authorize + Seq(resourceName1, resourceName2, resourceName1, resourceName3) + )(identity) + + verify(authorizer) + + assertEquals(Set(resourceName1, resourceName3), result) + } + +} + +object AuthHelperTest { + + /** + * Similar to `EasyMock.eq`, but matches if both lists have the same elements irrespective of ordering. + */ + def matchSameElements[T](list: java.util.List[T]): java.util.List[T] = { + EasyMock.reportMatcher(new IArgumentMatcher { + def matches(argument: Any): Boolean = argument match { + case l: java.util.List[_] => list.asScala.toSet == l.asScala.toSet + case _ => false + } + def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)") + }) + null + } + +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5bad37e7901..f3cd68be200 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -69,7 +69,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.easymock.EasyMock._ -import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher} +import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.Assert._ import org.junit.{After, Test} import org.mockito.{ArgumentMatchers, Mockito} @@ -148,103 +148,6 @@ class KafkaApisTest { cache) } - @Test - def testAuthorize(): Unit = { - val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) - - val operation = AclOperation.WRITE - val resourceType = ResourceType.TOPIC - val resourceName = "topic-1" - val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, - clientId, 0) - val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, - KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), - SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false) - - val expectedActions = Seq( - new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL), - 1, true, true) - ) - - EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava)) - .andReturn(Seq(AuthorizationResult.ALLOWED).asJava) - .once() - - EasyMock.replay(authorizer) - - val result = createKafkaApis(authorizer = Some(authorizer)).authHelper.authorize( - requestContext, operation, resourceType, resourceName) - - verify(authorizer) - - assertEquals(true, result) - } - - @Test - def testFilterByAuthorized(): Unit = { - val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) - - val operation = AclOperation.WRITE - val resourceType = ResourceType.TOPIC - val resourceName1 = "topic-1" - val resourceName2 = "topic-2" - val resourceName3 = "topic-3" - val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, - clientId, 0) - val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, - KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), - SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false) - - val expectedActions = Seq( - new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL), - 2, true, true), - new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL), - 1, true, true), - new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL), - 1, true, true), - ) - - EasyMock.expect(authorizer.authorize( - EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava) - )).andAnswer { () => - val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala - actions.map { action => - if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name)) - AuthorizationResult.ALLOWED - else - AuthorizationResult.DENIED - }.asJava - }.once() - - EasyMock.replay(authorizer) - - val result = createKafkaApis(authorizer = Some(authorizer)).filterByAuthorized( - requestContext, - operation, - resourceType, - // Duplicate resource names should not trigger multiple calls to authorize - Seq(resourceName1, resourceName2, resourceName1, resourceName3) - )(identity) - - verify(authorizer) - - assertEquals(Set(resourceName1, resourceName3), result) - } - - /** - * Returns true if the elements in both lists are the same irrespective of ordering. - */ - private def matchSameElements[T](list: util.List[T]): util.List[T] = { - EasyMock.reportMatcher(new IArgumentMatcher { - def matches(argument: Any): Boolean = argument match { - case s: util.List[_] => s.asScala.toSet == list.asScala.toSet - case _ => false - } - def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)") - }) - null - } - @Test def testDescribeConfigsWithAuthorizer(): Unit = { val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) @@ -868,7 +771,7 @@ class KafkaApisTest { 1, logIfAllowed, logIfDenied)) EasyMock.expect(authorizer.authorize( - anyObject[RequestContext], matchSameElements(expectedAuthorizedActions.asJava) + anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedAuthorizedActions.asJava) )).andAnswer { () => val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala actions.map { action => diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 94301a76571..0141e75935e 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -130,7 +130,6 @@ object TestUtils extends Logging { f } - /** * Create a temporary file */ diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 722bfd1fd84..dd1c6c19fd8 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -192,6 +192,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + + +