KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer

To dismiss the usage of operation ++ against Set which is slow when Set has many entries. This pr introduces a new class 'AclSets' which takes multiple Sets as parameters and do 'find' against them one by one. For more details about perf and benchmark, refer to [KAFKA-9685](https://issues.apache.org/jira/browse/KAFKA-9685)

Author: jiao <jiao.zhang@linecorp.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #8261 from jiao-zhangS/jira-9685
This commit is contained in:
jiao 2020-03-13 20:53:29 +05:30 committed by Manikumar Reddy
parent 2d2311d75c
commit e3ccf20794
3 changed files with 56 additions and 7 deletions

View File

@ -41,6 +41,7 @@
<allow class="org.apache.kafka.clients.FetchSessionHandler"/> <allow class="org.apache.kafka.clients.FetchSessionHandler"/>
<allow pkg="org.mockito"/> <allow pkg="org.mockito"/>
<allow pkg="kafka.security.authorizer"/> <allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<subpackage name="cache"> <subpackage name="cache">
</subpackage> </subpackage>

View File

@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1 import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls} import kafka.security.authorizer.AclAuthorizer.{AclSets, ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
@ -63,6 +63,12 @@ object AclAuthorizer {
case class VersionedAcls(acls: Set[AclEntry], zkVersion: Int) { case class VersionedAcls(acls: Set[AclEntry], zkVersion: Int) {
def exists: Boolean = zkVersion != ZkVersion.UnknownVersion def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
} }
class AclSets(sets: Set[AclEntry]*) {
def find(p: AclEntry => Boolean): Option[AclEntry] = sets.flatMap(_.find(p)).headOption
def isEmpty: Boolean = !sets.exists(_.nonEmpty)
}
val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion) val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
val WildcardHost = "*" val WildcardHost = "*"
@ -293,7 +299,7 @@ class AclAuthorizer extends Authorizer with Logging {
val host = requestContext.clientAddress.getHostAddress val host = requestContext.clientAddress.getHostAddress
val operation = action.operation val operation = action.operation
def isEmptyAclAndAuthorized(acls: Set[AclEntry]): Boolean = { def isEmptyAclAndAuthorized(acls: AclSets): Boolean = {
if (acls.isEmpty) { if (acls.isEmpty) {
// No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found // No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found
authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound") authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
@ -301,12 +307,12 @@ class AclAuthorizer extends Authorizer with Logging {
} else false } else false
} }
def denyAclExists(acls: Set[AclEntry]): Boolean = { def denyAclExists(acls: AclSets): Boolean = {
// Check if there are any Deny ACLs which would forbid this operation. // Check if there are any Deny ACLs which would forbid this operation.
matchingAclExists(operation, resource, principal, host, DENY, acls) matchingAclExists(operation, resource, principal, host, DENY, acls)
} }
def allowAclExists(acls: Set[AclEntry]): Boolean = { def allowAclExists(acls: AclSets): Boolean = {
// Check if there are any Allow ACLs which would allow this operation. // Check if there are any Allow ACLs which would allow this operation.
// Allowing read, write, delete, or alter implies allowing describe. // Allowing read, write, delete, or alter implies allowing describe.
// See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
@ -339,7 +345,7 @@ class AclAuthorizer extends Authorizer with Logging {
} else false } else false
} }
private def matchingAcls(resourceType: ResourceType, resourceName: String): Set[AclEntry] = { private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSets = {
inReadLock(lock) { inReadLock(lock) {
val wildcard = aclCache.get(new ResourcePattern(resourceType, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL)) val wildcard = aclCache.get(new ResourcePattern(resourceType, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL))
.map(_.acls) .map(_.acls)
@ -357,7 +363,7 @@ class AclAuthorizer extends Authorizer with Logging {
.flatMap { _.acls } .flatMap { _.acls }
.toSet .toSet
prefixed ++ wildcard ++ literal new AclSets(prefixed, wildcard, literal)
} }
} }
@ -366,7 +372,7 @@ class AclAuthorizer extends Authorizer with Logging {
principal: KafkaPrincipal, principal: KafkaPrincipal,
host: String, host: String,
permissionType: AclPermissionType, permissionType: AclPermissionType,
acls: Set[AclEntry]): Boolean = { acls: AclSets): Boolean = {
acls.find { acl => acls.find { acl =>
acl.permissionType == permissionType && acl.permissionType == permissionType &&
(acl.kafkaPrincipal == principal || acl.kafkaPrincipal == AclEntry.WildcardPrincipal) && (acl.kafkaPrincipal == principal || acl.kafkaPrincipal == AclEntry.WildcardPrincipal) &&

View File

@ -24,10 +24,17 @@ import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.Action;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Fork;
@ -45,8 +52,12 @@ import scala.collection.JavaConverters;
import scala.collection.immutable.TreeMap; import scala.collection.immutable.TreeMap;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -65,13 +76,22 @@ public class AclAuthorizerBenchmark {
@Param({"5", "10", "15"}) @Param({"5", "10", "15"})
private int aclCount; private int aclCount;
private int hostPreCount = 1000;
private AclAuthorizer aclAuthorizer = new AclAuthorizer(); private AclAuthorizer aclAuthorizer = new AclAuthorizer();
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private List<Action> actions = new ArrayList<>();
private RequestContext context;
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() throws Exception { public void setup() throws Exception {
setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(), setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(),
prepareAclCache()); prepareAclCache());
actions = Collections.singletonList(new Action(AclOperation.WRITE, new ResourcePattern(ResourceType.TOPIC, "resource-1", PatternType.LITERAL),
1, true, true));
context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(), "someclient", 1),
"1", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
} }
private void setFieldValue(Object obj, String fieldName, Object value) throws Exception { private void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
@ -98,6 +118,23 @@ public class AclAuthorizerBenchmark {
} }
} }
ResourcePattern resourceWildcard = new ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL);
ResourcePattern resourcePrefix = new ResourcePattern(ResourceType.TOPIC, "resource-", PatternType.PREFIXED);
Set<AclEntry> entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
Set<AclEntry> entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
for (int hostId = 0; hostId < hostPreCount; hostId++) {
AccessControlEntry ace = new AccessControlEntry(principal.toString(), "127.0.0." + hostId, AclOperation.READ, AclPermissionType.ALLOW);
entriesPrefix.add(new AclEntry(ace));
}
// get dynamic entries number for wildcard acl
for (int hostId = 0; hostId < resourceCount / 10; hostId++) {
AccessControlEntry ace = new AccessControlEntry(principal.toString(), "127.0.0." + hostId, AclOperation.READ, AclPermissionType.ALLOW);
entriesWildcard.add(new AclEntry(ace));
}
TreeMap<ResourcePattern, VersionedAcls> aclCache = new TreeMap<>(new AclAuthorizer.ResourceOrdering()); TreeMap<ResourcePattern, VersionedAcls> aclCache = new TreeMap<>(new AclAuthorizer.ResourceOrdering());
for (Map.Entry<ResourcePattern, Set<AclEntry>> entry : aclEntries.entrySet()) { for (Map.Entry<ResourcePattern, Set<AclEntry>> entry : aclEntries.entrySet()) {
aclCache = aclCache.updated(entry.getKey(), aclCache = aclCache.updated(entry.getKey(),
@ -116,4 +153,9 @@ public class AclAuthorizerBenchmark {
public void testAclsIterator() { public void testAclsIterator() {
aclAuthorizer.acls(AclBindingFilter.ANY); aclAuthorizer.acls(AclBindingFilter.ANY);
} }
@Benchmark
public void testAuthorizer() throws Exception {
aclAuthorizer.authorize(context, actions);
}
} }