diff --git a/core/src/main/scala/kafka/server/AuthHelper.scala b/core/src/main/scala/kafka/server/AuthHelper.scala
index d68fe17e227..50a13510ac0 100644
--- a/core/src/main/scala/kafka/server/AuthHelper.scala
+++ b/core/src/main/scala/kafka/server/AuthHelper.scala
@@ -19,9 +19,9 @@ package kafka.server
import java.lang.{Byte => JByte}
import java.util.Collections
-
import kafka.network.RequestChannel
import kafka.security.authorizer.AclEntry
+import kafka.utils.CoreUtils
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.requests.RequestContext
@@ -31,9 +31,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
-
class AuthHelper(authorizer: Option[Authorizer]) {
def authorize(requestContext: RequestContext,
@@ -70,4 +70,64 @@ class AuthHelper(authorizer: Option[Authorizer]) {
}
Utils.to32BitField(authorizedOps.map(operation => operation.code.asInstanceOf[JByte]).asJava)
}
+
+ def authorizeByResourceType(requestContext: RequestContext, operation: AclOperation,
+ resourceType: ResourceType): Boolean = {
+ authorizer.forall { authZ =>
+ authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
+ }
+ }
+
+ def partitionSeqByAuthorized[T](requestContext: RequestContext,
+ operation: AclOperation,
+ resourceType: ResourceType,
+ resources: Seq[T],
+ logIfAllowed: Boolean = true,
+ logIfDenied: Boolean = true)(resourceName: T => String): (Seq[T], Seq[T]) = {
+ authorizer match {
+ case Some(_) =>
+ val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
+ resources, logIfAllowed, logIfDenied)(resourceName)
+ resources.partition(resource => authorizedResourceNames.contains(resourceName(resource)))
+ case None => (resources, Seq.empty)
+ }
+ }
+
+ def partitionMapByAuthorized[K, V](requestContext: RequestContext,
+ operation: AclOperation,
+ resourceType: ResourceType,
+ resources: Map[K, V],
+ logIfAllowed: Boolean = true,
+ logIfDenied: Boolean = true)(resourceName: K => String): (Map[K, V], Map[K, V]) = {
+ authorizer match {
+ case Some(_) =>
+ val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
+ resources.keySet, logIfAllowed, logIfDenied)(resourceName)
+ resources.partition { case (k, _) => authorizedResourceNames.contains(resourceName(k)) }
+ case None => (resources, Map.empty)
+ }
+ }
+
+ def filterByAuthorized[T](requestContext: RequestContext,
+ operation: AclOperation,
+ resourceType: ResourceType,
+ resources: Iterable[T],
+ logIfAllowed: Boolean = true,
+ logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = {
+ authorizer match {
+ case Some(authZ) =>
+ val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _)
+ val actions = resourceNameToCount.iterator.map { case (resourceName, count) =>
+ val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
+ new Action(operation, resource, count, logIfAllowed, logIfDenied)
+ }.toBuffer
+ authZ.authorize(requestContext, actions.asJava).asScala
+ .zip(resourceNameToCount.keySet)
+ .collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
+ resourceName
+ }.toSet
+ case None => resources.iterator.map(resourceName).toSet
+ }
+ }
+
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1b5c4d078a8..41f33225491 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, ElectLeadersRequestOps, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
-import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.controller.{KafkaController, ReplicaAssignment}
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult}
@@ -74,7 +73,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
+import org.apache.kafka.common.resource.{Resource, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
@@ -253,25 +252,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
- def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]): Unit = {
- // for each new leader or follower, call coordinator to handle consumer group migration.
- // this callback is invoked under the replica state change lock to ensure proper order of
- // leadership changes
- updatedLeaders.foreach { partition =>
- if (partition.topic == GROUP_METADATA_TOPIC_NAME)
- groupCoordinator.onElection(partition.partitionId)
- else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
- txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch)
- }
-
- updatedFollowers.foreach { partition =>
- if (partition.topic == GROUP_METADATA_TOPIC_NAME)
- groupCoordinator.onResignation(partition.partitionId)
- else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
- txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
- }
- }
-
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
@@ -280,7 +260,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"${leaderAndIsrRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}")
requestHelper.sendResponseExemptThrottle(request, leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception))
} else {
- val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
+ val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
+ RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
requestHelper.sendResponseExemptThrottle(request, response)
}
}
@@ -453,7 +434,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
val topics = offsetCommitRequest.data.topics.asScala
- val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
for (topicData <- topics) {
for (partitionData <- topicData.partitions.asScala) {
val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
@@ -559,7 +540,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
// cache the result to avoid redundant authorization calls
- val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC,
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
produceRequest.data().topicData().asScala)(_.name())
produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
@@ -724,7 +705,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchContext.foreachPartition { (topicPartition, partitionData) =>
partitionDatas += topicPartition -> partitionData
}
- val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
partitionDatas.foreach { case (topicPartition, data) =>
if (!authorizedTopics.contains(topicPartition.topic))
erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
@@ -959,7 +940,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetsRequest]
- val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context,
+ val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
@@ -1023,7 +1004,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
}
- val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context,
+ val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
@@ -1212,7 +1193,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else
metadataRequest.topics.asScala.toSet
- val authorizedForDescribeTopics = filterByAuthorized(request.context, DESCRIBE, TOPIC,
+ val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
topics, logIfDenied = !metadataRequest.isAllTopics)(identity)
var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains)
var unauthorizedForCreateTopics = Set[String]()
@@ -1221,7 +1202,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
- val authorizedForCreateTopics = filterByAuthorized(request.context, CREATE, TOPIC,
+ val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
nonExistingTopics)(identity)
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
@@ -1307,7 +1288,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetFetchRequest = request.body[OffsetFetchRequest]
def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) =
- partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic)
+ authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic)
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
@@ -1351,7 +1332,7 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
else {
// clients are not allowed to see offsets for topics that are not authorized for Describe
- val (authorizedPartitionData, _) = partitionMapByAuthorized(request.context,
+ val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(request.context,
DESCRIBE, TOPIC, allPartitionData)(_.topic)
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava)
}
@@ -1632,7 +1613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val deleteGroupsRequest = request.body[DeleteGroupsRequest]
val groups = deleteGroupsRequest.data.groupsNames.asScala.distinct
- val (authorizedGroups, unauthorizedGroups) = partitionSeqByAuthorized(request.context, DELETE, GROUP,
+ val (authorizedGroups, unauthorizedGroups) = authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP,
groups)(identity)
val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet) ++
@@ -1827,8 +1808,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val topics = createTopicsRequest.data.topics.asScala.map(_.name)
val authorizedTopics =
if (hasClusterAuthorization) topics.toSet
- else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
- val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
+ else authHelper.filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+ val authorizedForDescribeConfigs = authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
results.forEach { topic =>
@@ -1908,7 +1889,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.filter { _._2.size > 1 }
.keySet
val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
- val (authorized, unauthorized) = partitionSeqByAuthorized(request.context, ALTER, TOPIC,
+ val (authorized, unauthorized) = authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC,
notDuped)(_.name)
val (queuedForDeletion, valid) = authorized.partition { topic =>
@@ -1966,7 +1947,7 @@ class KafkaApis(val requestChannel: RequestChannel,
results.add(new DeletableTopicResult()
.setName(topic))
}
- val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC,
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala)(_.name)
results.forEach { topic =>
if (!authorizedTopics.contains(topic.name))
@@ -2007,7 +1988,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
val topics = deleteRecordsRequest.data.topics.asScala
- val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, topics)(_.name)
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, topics)(_.name)
val deleteTopicPartitions = topics.flatMap { deleteTopic =>
deleteTopic.partitions.asScala.map { deletePartition =>
new TopicPartition(deleteTopic.name, deletePartition.partitionIndex) -> deletePartition.offset
@@ -2076,7 +2057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
return
}
} else if (!authHelper.authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false)
- && !authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) {
+ && !authHelper.authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
@@ -2279,7 +2260,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
val authorizedPartitions = mutable.Set[TopicPartition]()
- val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC,
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
for (topicPartition <- partitionsToAdd) {
if (!authorizedTopics.contains(topicPartition.topic))
@@ -2394,7 +2375,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
val committedOffsets = txnOffsetCommitRequest.offsets.asScala
- val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic)
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic)
for ((topicPartition, commitedOffset) <- committedOffsets) {
if (!authorizedTopics.contains(topicPartition.topic))
@@ -2571,7 +2552,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedTopics, unauthorizedTopics) =
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false))
(topics, Seq.empty[OffsetForLeaderTopic])
- else partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic)
+ else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic)
val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedTopics)
val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { offsetForLeaderTopic =>
@@ -3037,7 +3018,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
val topics = offsetDeleteRequest.data.topics.asScala
- val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
+ val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
@@ -3245,7 +3226,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"${config.interBrokerListenerName}.")
requestHelper.closeConnection(request, Collections.emptyMap())
return
- } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+ } else if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
return
@@ -3350,82 +3331,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- // private package for testing
- private[server] def authorize(requestContext: RequestContext,
- operation: AclOperation,
- resourceType: ResourceType,
- resourceName: String,
- logIfAllowed: Boolean = true,
- logIfDenied: Boolean = true,
- refCount: Int = 1): Boolean = {
- authorizer.forall { authZ =>
- val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
- val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
- authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
- }
- }
-
- private def authorizeByResourceType(requestContext: RequestContext,
- operation: AclOperation,
- resourceType: ResourceType): Boolean = {
- authorizer.forall { authZ =>
- authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
- }
- }
-
- // private package for testing
- private[server] def filterByAuthorized[T](requestContext: RequestContext,
- operation: AclOperation,
- resourceType: ResourceType,
- resources: Iterable[T],
- logIfAllowed: Boolean = true,
- logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = {
- authorizer match {
- case Some(authZ) =>
- val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _)
- val actions = resourceNameToCount.iterator.map { case (resourceName, count) =>
- val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
- new Action(operation, resource, count, logIfAllowed, logIfDenied)
- }.toBuffer
- authZ.authorize(requestContext, actions.asJava).asScala
- .zip(resourceNameToCount.keySet)
- .collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
- resourceName
- }.toSet
- case None => resources.iterator.map(resourceName).toSet
- }
- }
-
- private def partitionSeqByAuthorized[T](requestContext: RequestContext,
- operation: AclOperation,
- resourceType: ResourceType,
- resources: Seq[T],
- logIfAllowed: Boolean = true,
- logIfDenied: Boolean = true)(resourceName: T => String): (Seq[T], Seq[T]) = {
- authorizer match {
- case Some(_) =>
- val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
- resources, logIfAllowed, logIfDenied)(resourceName)
- resources.partition(resource => authorizedResourceNames.contains(resourceName(resource)))
- case None => (resources, Seq.empty)
- }
- }
-
- private def partitionMapByAuthorized[K, V](requestContext: RequestContext,
- operation: AclOperation,
- resourceType: ResourceType,
- resources: Map[K, V],
- logIfAllowed: Boolean = true,
- logIfDenied: Boolean = true)(resourceName: K => String): (Map[K, V], Map[K, V]) = {
- authorizer match {
- case Some(_) =>
- val authorizedResourceNames = filterByAuthorized(requestContext, operation, resourceType,
- resources.keySet, logIfAllowed, logIfDenied)(resourceName)
- resources.partition { case (k, _) => authorizedResourceNames.contains(resourceName(k)) }
- case None => (resources, Map.empty)
- }
- }
-
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = {
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index ff58a871881..96a71c28518 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -23,10 +23,9 @@ import java.nio.channels._
import java.util.concurrent.locks.{Lock, ReadWriteLock}
import java.lang.management._
import java.util.{Base64, Properties, UUID}
-
import com.typesafe.scalalogging.Logger
-import javax.management._
+import javax.management._
import scala.collection._
import scala.collection.{Seq, mutable}
import kafka.cluster.EndPoint
diff --git a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
new file mode 100644
index 00000000000..c48b051cde9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.net.InetAddress
+import java.util
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
+import org.easymock.EasyMock._
+import org.easymock.{EasyMock, IArgumentMatcher}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class AuthHelperTest {
+ import AuthHelperTest._
+
+ private val clientId = ""
+
+ @Test
+ def testAuthorize(): Unit = {
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+ val operation = AclOperation.WRITE
+ val resourceType = ResourceType.TOPIC
+ val resourceName = "topic-1"
+ val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, clientId, 0)
+ val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+ KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
+ 1, true, true)
+ )
+
+ EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava))
+ .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+ .once()
+
+ EasyMock.replay(authorizer)
+
+ val result = new AuthHelper(Some(authorizer)).authorize(
+ requestContext, operation, resourceType, resourceName)
+
+ verify(authorizer)
+
+ assertEquals(true, result)
+ }
+
+ @Test
+ def testFilterByAuthorized(): Unit = {
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+ val operation = AclOperation.WRITE
+ val resourceType = ResourceType.TOPIC
+ val resourceName1 = "topic-1"
+ val resourceName2 = "topic-2"
+ val resourceName3 = "topic-3"
+ val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
+ clientId, 0)
+ val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+ KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
+ 2, true, true),
+ new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL),
+ 1, true, true),
+ new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL),
+ 1, true, true),
+ )
+
+ EasyMock.expect(authorizer.authorize(
+ EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava)
+ )).andAnswer { () =>
+ val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+ actions.map { action =>
+ if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name))
+ AuthorizationResult.ALLOWED
+ else
+ AuthorizationResult.DENIED
+ }.asJava
+ }.once()
+
+ EasyMock.replay(authorizer)
+
+ val result = new AuthHelper(Some(authorizer)).filterByAuthorized(
+ requestContext,
+ operation,
+ resourceType,
+ // Duplicate resource names should not trigger multiple calls to authorize
+ Seq(resourceName1, resourceName2, resourceName1, resourceName3)
+ )(identity)
+
+ verify(authorizer)
+
+ assertEquals(Set(resourceName1, resourceName3), result)
+ }
+
+}
+
+object AuthHelperTest {
+
+ /**
+ * Similar to `EasyMock.eq`, but matches if both lists have the same elements irrespective of ordering.
+ */
+ def matchSameElements[T](list: java.util.List[T]): java.util.List[T] = {
+ EasyMock.reportMatcher(new IArgumentMatcher {
+ def matches(argument: Any): Boolean = argument match {
+ case l: java.util.List[_] => list.asScala.toSet == l.asScala.toSet
+ case _ => false
+ }
+ def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)")
+ })
+ null
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5bad37e7901..f3cd68be200 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -69,7 +69,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.easymock.EasyMock._
-import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher}
+import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.{After, Test}
import org.mockito.{ArgumentMatchers, Mockito}
@@ -148,103 +148,6 @@ class KafkaApisTest {
cache)
}
- @Test
- def testAuthorize(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
-
- val operation = AclOperation.WRITE
- val resourceType = ResourceType.TOPIC
- val resourceName = "topic-1"
- val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
- clientId, 0)
- val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
- KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
- SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
-
- val expectedActions = Seq(
- new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
- 1, true, true)
- )
-
- EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
-
- EasyMock.replay(authorizer)
-
- val result = createKafkaApis(authorizer = Some(authorizer)).authHelper.authorize(
- requestContext, operation, resourceType, resourceName)
-
- verify(authorizer)
-
- assertEquals(true, result)
- }
-
- @Test
- def testFilterByAuthorized(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
-
- val operation = AclOperation.WRITE
- val resourceType = ResourceType.TOPIC
- val resourceName1 = "topic-1"
- val resourceName2 = "topic-2"
- val resourceName3 = "topic-3"
- val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion,
- clientId, 0)
- val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
- KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
- SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
-
- val expectedActions = Seq(
- new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
- 2, true, true),
- new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL),
- 1, true, true),
- new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL),
- 1, true, true),
- )
-
- EasyMock.expect(authorizer.authorize(
- EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava)
- )).andAnswer { () =>
- val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
- actions.map { action =>
- if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name))
- AuthorizationResult.ALLOWED
- else
- AuthorizationResult.DENIED
- }.asJava
- }.once()
-
- EasyMock.replay(authorizer)
-
- val result = createKafkaApis(authorizer = Some(authorizer)).filterByAuthorized(
- requestContext,
- operation,
- resourceType,
- // Duplicate resource names should not trigger multiple calls to authorize
- Seq(resourceName1, resourceName2, resourceName1, resourceName3)
- )(identity)
-
- verify(authorizer)
-
- assertEquals(Set(resourceName1, resourceName3), result)
- }
-
- /**
- * Returns true if the elements in both lists are the same irrespective of ordering.
- */
- private def matchSameElements[T](list: util.List[T]): util.List[T] = {
- EasyMock.reportMatcher(new IArgumentMatcher {
- def matches(argument: Any): Boolean = argument match {
- case s: util.List[_] => s.asScala.toSet == list.asScala.toSet
- case _ => false
- }
- def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)")
- })
- null
- }
-
@Test
def testDescribeConfigsWithAuthorizer(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
@@ -868,7 +771,7 @@ class KafkaApisTest {
1, logIfAllowed, logIfDenied))
EasyMock.expect(authorizer.authorize(
- anyObject[RequestContext], matchSameElements(expectedAuthorizedActions.asJava)
+ anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedAuthorizedActions.asJava)
)).andAnswer { () =>
val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 94301a76571..0141e75935e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -130,7 +130,6 @@ object TestUtils extends Logging {
f
}
-
/**
* Create a temporary file
*/
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 722bfd1fd84..dd1c6c19fd8 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -192,6 +192,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
+
+
+
+
+