Kafka 9626: Improve ACLAuthorizer.acls() performance

This PR avoids creation of unnecessary sets in AclAuthorizer.acls() method implementation.

Perf results:
**Old**
```
Benchmark                                (aclCount)  (resourceCount)  Mode  Cnt    Score   Error  Units
AclAuthorizerBenchmark.testAclsIterator           5             5000  avgt   15    5.821 ? 0.309  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            10000  avgt   15   15.303 ? 0.107  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            50000  avgt   15   74.976 ? 0.543  ms/op
AclAuthorizerBenchmark.testAclsIterator          10             5000  avgt   15   15.366 ? 0.184  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            10000  avgt   15   29.899 ? 0.129  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            50000  avgt   15  167.301 ? 1.723  ms/op
AclAuthorizerBenchmark.testAclsIterator          15             5000  avgt   15   21.980 ? 0.114  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            10000  avgt   15   44.385 ? 0.255  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            50000  avgt   15  241.919 ? 3.955  ms/op
```
**New**

```
Benchmark                                (aclCount)  (resourceCount)  Mode  Cnt   Score   Error  Units
AclAuthorizerBenchmark.testAclsIterator           5             5000  avgt   15   0.666 ? 0.004  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            10000  avgt   15   1.427 ? 0.015  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            50000  avgt   15  21.410 ? 0.225  ms/op
AclAuthorizerBenchmark.testAclsIterator          10             5000  avgt   15   1.230 ? 0.018  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            10000  avgt   15   4.303 ? 0.744  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            50000  avgt   15  36.724 ? 0.409  ms/op
AclAuthorizerBenchmark.testAclsIterator          15             5000  avgt   15   2.433 ? 0.379  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            10000  avgt   15   9.818 ? 0.214  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            50000  avgt   15  52.886 ? 0.525  ms/op
```

Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Author: Lucas Bradstreet <lucas@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>, Lucas Bradstreet <lucas@confluent.io>

Closes #8199 from omkreddy/KAFKA-9626
This commit is contained in:
Manikumar Reddy 2020-03-03 01:51:09 +05:30 committed by Manikumar Reddy
parent ea0c027531
commit 8dff0b168a
4 changed files with 138 additions and 10 deletions

View File

@ -40,7 +40,7 @@
<allow class="kafka.utils.KafkaScheduler"/>
<allow class="org.apache.kafka.clients.FetchSessionHandler"/>
<allow pkg="org.mockito"/>
<allow pkg="kafka.security.authorizer"/>
<subpackage name="cache">
</subpackage>

View File

@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.VersionedAcls
import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
@ -36,7 +36,7 @@ import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, Un
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, SecurityUtils}
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig
@ -67,7 +67,7 @@ object AclAuthorizer {
val WildcardHost = "*"
// Orders by resource type, then resource pattern type and finally reverse ordering by name.
private object ResourceOrdering extends Ordering[ResourcePattern] {
class ResourceOrdering extends Ordering[ResourcePattern] {
def compare(a: ResourcePattern, b: ResourcePattern): Int = {
val rt = a.resourceType.compareTo(b.resourceType)
@ -116,7 +116,7 @@ class AclAuthorizer extends Authorizer with Logging {
private var extendedAclSupport: Boolean = _
@volatile
private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(AclAuthorizer.ResourceOrdering)
private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)
private val lock = new ReentrantReadWriteLock()
// The maximum number of times we should try to update the resource acls in zookeeper before failing;
@ -260,10 +260,15 @@ class AclAuthorizer extends Authorizer with Logging {
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
inReadLock(lock) {
unorderedAcls.flatMap { case (resource, versionedAcls) =>
versionedAcls.acls.map(acl => new AclBinding(resource, acl.ace))
.filter(filter.matches)
}.asJava
val aclBindings = new util.ArrayList[AclBinding]()
unorderedAcls.foreach { case (resource, versionedAcls) =>
versionedAcls.acls.foreach { acl =>
val binding = new AclBinding(resource, acl.ace)
if (filter.matches(binding))
aclBindings.add(binding)
}
}
aclBindings
}
}

View File

@ -427,4 +427,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
</Match>
<Match>
<!-- Suppress warnings related to jmh generated code -->
<Package name="org.apache.kafka.jmh.acl.generated"/>
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.acl;
import kafka.security.authorizer.AclAuthorizer;
import kafka.security.authorizer.AclAuthorizer.VersionedAcls;
import kafka.security.authorizer.AclEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.collection.JavaConverters;
import scala.collection.immutable.TreeMap;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AclAuthorizerBenchmark {
@Param({"5000", "10000", "50000"})
private int resourceCount;
//no. of. rules per resource
@Param({"5", "10", "15"})
private int aclCount;
private AclAuthorizer aclAuthorizer = new AclAuthorizer();
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
@Setup(Level.Trial)
public void setup() throws Exception {
setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(),
prepareAclCache());
}
private void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(obj, value);
}
private TreeMap<ResourcePattern, VersionedAcls> prepareAclCache() {
Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
ResourcePattern resource = new ResourcePattern(
(resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC,
"resource-" + resourceId,
(resourceId % 5 == 0) ? PatternType.PREFIXED : PatternType.LITERAL);
Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
for (int aclId = 0; aclId < aclCount; aclId++) {
AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId,
"*", AclOperation.READ, AclPermissionType.ALLOW);
entries.add(new AclEntry(ace));
}
}
TreeMap<ResourcePattern, VersionedAcls> aclCache = new TreeMap<>(new AclAuthorizer.ResourceOrdering());
for (Map.Entry<ResourcePattern, Set<AclEntry>> entry : aclEntries.entrySet()) {
aclCache = aclCache.updated(entry.getKey(),
new VersionedAcls(JavaConverters.asScalaSetConverter(entry.getValue()).asScala().toSet(), 1));
}
return aclCache;
}
@TearDown(Level.Trial)
public void tearDown() {
aclAuthorizer.close();
}
@Benchmark
public void testAclsIterator() {
aclAuthorizer.acls(AclBindingFilter.ANY);
}
}