diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 5ada90af263..c59dda61bf8 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -41,6 +41,7 @@ + diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 86f06981ff8..c750ec64be6 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger import kafka.api.KAFKA_2_0_IV1 -import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls} +import kafka.security.authorizer.AclAuthorizer.{AclSets, ResourceOrdering, VersionedAcls} import kafka.security.authorizer.AclEntry.ResourceSeparator import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.CoreUtils.{inReadLock, inWriteLock} @@ -63,6 +63,12 @@ object AclAuthorizer { case class VersionedAcls(acls: Set[AclEntry], zkVersion: Int) { def exists: Boolean = zkVersion != ZkVersion.UnknownVersion } + + class AclSets(sets: Set[AclEntry]*) { + def find(p: AclEntry => Boolean): Option[AclEntry] = sets.flatMap(_.find(p)).headOption + def isEmpty: Boolean = !sets.exists(_.nonEmpty) + } + val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion) val WildcardHost = "*" @@ -293,7 +299,7 @@ class AclAuthorizer extends Authorizer with Logging { val host = requestContext.clientAddress.getHostAddress val operation = action.operation - def isEmptyAclAndAuthorized(acls: Set[AclEntry]): Boolean = { + def isEmptyAclAndAuthorized(acls: AclSets): Boolean = { if (acls.isEmpty) { // No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound") @@ -301,12 +307,12 @@ class AclAuthorizer extends Authorizer with Logging { } else false } - def denyAclExists(acls: Set[AclEntry]): Boolean = { + def denyAclExists(acls: AclSets): Boolean = { // Check if there are any Deny ACLs which would forbid this operation. matchingAclExists(operation, resource, principal, host, DENY, acls) } - def allowAclExists(acls: Set[AclEntry]): Boolean = { + def allowAclExists(acls: AclSets): Boolean = { // Check if there are any Allow ACLs which would allow this operation. // Allowing read, write, delete, or alter implies allowing describe. // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. @@ -339,7 +345,7 @@ class AclAuthorizer extends Authorizer with Logging { } else false } - private def matchingAcls(resourceType: ResourceType, resourceName: String): Set[AclEntry] = { + private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSets = { inReadLock(lock) { val wildcard = aclCache.get(new ResourcePattern(resourceType, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL)) .map(_.acls) @@ -357,7 +363,7 @@ class AclAuthorizer extends Authorizer with Logging { .flatMap { _.acls } .toSet - prefixed ++ wildcard ++ literal + new AclSets(prefixed, wildcard, literal) } } @@ -366,7 +372,7 @@ class AclAuthorizer extends Authorizer with Logging { principal: KafkaPrincipal, host: String, permissionType: AclPermissionType, - acls: Set[AclEntry]): Boolean = { + acls: AclSets): Boolean = { acls.find { acl => acl.permissionType == permissionType && (acl.kafkaPrincipal == principal || acl.kafkaPrincipal == AclEntry.WildcardPrincipal) && diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java index 84650d34c73..c36fb314a45 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java @@ -24,10 +24,17 @@ import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.authorizer.Action; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -45,8 +52,12 @@ import scala.collection.JavaConverters; import scala.collection.immutable.TreeMap; import java.lang.reflect.Field; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -65,13 +76,22 @@ public class AclAuthorizerBenchmark { @Param({"5", "10", "15"}) private int aclCount; + private int hostPreCount = 1000; + private AclAuthorizer aclAuthorizer = new AclAuthorizer(); private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); + private List actions = new ArrayList<>(); + private RequestContext context; @Setup(Level.Trial) public void setup() throws Exception { setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(), prepareAclCache()); + actions = Collections.singletonList(new Action(AclOperation.WRITE, new ResourcePattern(ResourceType.TOPIC, "resource-1", PatternType.LITERAL), + 1, true, true)); + context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(), "someclient", 1), + "1", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, + ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY); } private void setFieldValue(Object obj, String fieldName, Object value) throws Exception { @@ -98,6 +118,23 @@ public class AclAuthorizerBenchmark { } } + ResourcePattern resourceWildcard = new ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL); + ResourcePattern resourcePrefix = new ResourcePattern(ResourceType.TOPIC, "resource-", PatternType.PREFIXED); + + Set entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>()); + Set entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>()); + + for (int hostId = 0; hostId < hostPreCount; hostId++) { + AccessControlEntry ace = new AccessControlEntry(principal.toString(), "127.0.0." + hostId, AclOperation.READ, AclPermissionType.ALLOW); + entriesPrefix.add(new AclEntry(ace)); + } + + // get dynamic entries number for wildcard acl + for (int hostId = 0; hostId < resourceCount / 10; hostId++) { + AccessControlEntry ace = new AccessControlEntry(principal.toString(), "127.0.0." + hostId, AclOperation.READ, AclPermissionType.ALLOW); + entriesWildcard.add(new AclEntry(ace)); + } + TreeMap aclCache = new TreeMap<>(new AclAuthorizer.ResourceOrdering()); for (Map.Entry> entry : aclEntries.entrySet()) { aclCache = aclCache.updated(entry.getKey(), @@ -116,4 +153,9 @@ public class AclAuthorizerBenchmark { public void testAclsIterator() { aclAuthorizer.acls(AclBindingFilter.ANY); } + + @Benchmark + public void testAuthorizer() throws Exception { + aclAuthorizer.authorize(context, actions); + } }