KAFKA-17729: Remove ZK from AuthorizerBenchmark, CheckpointBench and PartitionCreationBench (#17415)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-10-09 05:07:15 +02:00 committed by GitHub
parent 2836f7aaae
commit 07cafdd9df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 17 additions and 341 deletions

View File

@ -17,9 +17,6 @@
package org.apache.kafka.jmh.acl; package org.apache.kafka.jmh.acl;
import kafka.security.authorizer.AclAuthorizer;
import kafka.security.authorizer.AclAuthorizer.VersionedAcls;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBinding;
@ -40,7 +37,6 @@ import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.security.authorizer.AclEntry; import org.apache.kafka.security.authorizer.AclEntry;
import org.apache.kafka.server.authorizer.Action; import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.Authorizer;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
@ -66,11 +62,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import scala.collection.JavaConverters;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -80,21 +72,6 @@ import scala.collection.JavaConverters;
@OutputTimeUnit(TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AuthorizerBenchmark { public class AuthorizerBenchmark {
public enum AuthorizerType {
ACL(AclAuthorizer::new),
KRAFT(StandardAuthorizer::new);
private final Supplier<Authorizer> supplier;
AuthorizerType(Supplier<Authorizer> supplier) {
this.supplier = supplier;
}
Authorizer newAuthorizer() {
return supplier.get();
}
}
@Param({"10000", "50000", "200000"}) @Param({"10000", "50000", "200000"})
private int resourceCount; private int resourceCount;
//no. of. rules per resource //no. of. rules per resource
@ -104,15 +81,11 @@ public class AuthorizerBenchmark {
@Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"}) @Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"})
private double denyPercentage; private double denyPercentage;
@Param({"ACL", "KRAFT"}) private final int hostPreCount = 10;
private AuthorizerType authorizerType;
private final int hostPreCount = 1000;
private final String resourceNamePrefix = "foo-bar35_resource-"; private final String resourceNamePrefix = "foo-bar35_resource-";
private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private final String authorizeByResourceTypeHostName = "127.0.0.2"; private final String authorizeByResourceTypeHostName = "127.0.0.2";
private final HashMap<ResourcePattern, AclAuthorizer.VersionedAcls> aclToUpdate = new HashMap<>(); private StandardAuthorizer authorizer;
private Authorizer authorizer;
private List<Action> actions = new ArrayList<>(); private List<Action> actions = new ArrayList<>();
private RequestContext authorizeContext; private RequestContext authorizeContext;
private RequestContext authorizeByResourceTypeContext; private RequestContext authorizeByResourceTypeContext;
@ -122,13 +95,12 @@ public class AuthorizerBenchmark {
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() throws Exception { public void setup() throws Exception {
authorizer = authorizerType.newAuthorizer(); authorizer = new StandardAuthorizer();
prepareAclCache(); prepareAclCache();
prepareAclToUpdate();
// By adding `-95` to the resource name prefix, we cause the `TreeMap.from/to` call to return // By adding `-95` to the resource name prefix, we cause the `TreeMap.from/to` call to return
// most map entries. In such cases, we rely on the filtering based on `String.startsWith` // most map entries. In such cases, we rely on the filtering based on `String.startsWith`
// to return the matching ACLs. Using a more efficient data structure (e.g. a prefix // to return the matching ACLs. Using a more efficient data structure (e.g. a prefix
// tree) should improve performance significantly). // tree) should improve performance significantly.
actions = Collections.singletonList(new Action(AclOperation.WRITE, actions = Collections.singletonList(new Action(AclOperation.WRITE,
new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, PatternType.LITERAL), new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, PatternType.LITERAL),
1, true, true)); 1, true, true));
@ -212,43 +184,16 @@ public class AuthorizerBenchmark {
private void setupAcls(Map<ResourcePattern, Set<AclEntry>> aclEntries) { private void setupAcls(Map<ResourcePattern, Set<AclEntry>> aclEntries) {
for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap : aclEntries.entrySet()) { for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap : aclEntries.entrySet()) {
ResourcePattern resourcePattern = entryMap.getKey(); ResourcePattern resourcePattern = entryMap.getKey();
switch (authorizerType) {
case ACL:
((AclAuthorizer) authorizer).updateCache(resourcePattern,
new VersionedAcls(JavaConverters.asScalaSetConverter(entryMap.getValue()).asScala().toSet(), 1));
break;
case KRAFT:
for (AclEntry aclEntry : entryMap.getValue()) { for (AclEntry aclEntry : entryMap.getValue()) {
StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, aclEntry)); StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, aclEntry));
((StandardAuthorizer) authorizer).addAcl(Uuid.randomUuid(), standardAcl); authorizer.addAcl(Uuid.randomUuid(), standardAcl);
}
((StandardAuthorizer) authorizer).completeInitialLoad();
break;
} }
} authorizer.completeInitialLoad();
}
private void prepareAclToUpdate() {
scala.collection.mutable.Set<AclEntry> entries = new scala.collection.mutable.HashSet<>();
for (int i = 0; i < resourceCount; i++) {
scala.collection.immutable.Set<AclEntry> immutable = new scala.collection.immutable.HashSet<>();
for (int j = 0; j < aclCount; j++) {
entries.add(new AclEntry(new AccessControlEntry(
principal.toString(), "127.0.0" + j, AclOperation.WRITE, AclPermissionType.ALLOW)));
immutable = entries.toSet();
}
aclToUpdate.put(
new ResourcePattern(ResourceType.TOPIC, randomResourceName(resourceNamePrefix), PatternType.LITERAL),
new AclAuthorizer.VersionedAcls(immutable, i));
} }
} }
private String randomResourceName(String prefix) {
return prefix + UUID.randomUUID().toString().substring(0, 5);
}
private Boolean shouldDeny() { private Boolean shouldDeny() {
return rand.nextDouble() * 100.0 - eps < denyPercentage; return rand.nextDouble() * 100.0 - eps < denyPercentage;
} }
@ -272,13 +217,4 @@ public class AuthorizerBenchmark {
public void testAuthorizeByResourceType() { public void testAuthorizeByResourceType() {
authorizer.authorizeByResourceType(authorizeByResourceTypeContext, AclOperation.READ, ResourceType.TOPIC); authorizer.authorizeByResourceType(authorizeByResourceTypeContext, AclOperation.READ, ResourceType.TOPIC);
} }
@Benchmark
public void testUpdateCache() {
if (authorizerType == AuthorizerType.ACL) {
for (Map.Entry<ResourcePattern, VersionedAcls> e : aclToUpdate.entrySet()) {
((AclAuthorizer) authorizer).updateCache(e.getKey(), e.getValue());
}
}
}
} }

View File

@ -1,248 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.metadata;
import kafka.controller.KafkaController;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.AutoTopicCreationManager;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager;
import kafka.server.FetchManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
import kafka.server.SimpleApiVersionManager;
import kafka.server.ZkAdminManager;
import kafka.server.ZkBrokerEpochManager;
import kafka.server.ZkSupport;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.metadata.ZkMetadataCache;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import scala.Option;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MetadataRequestBenchmark {
private final RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
private final RequestChannelMetrics requestChannelMetrics = Mockito.mock(RequestChannelMetrics.class);
private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private final GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
private final ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class);
private final TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private final KafkaController kafkaController = Mockito.mock(KafkaController.class);
private final AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
private final KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private final Metrics metrics = new Metrics();
private final int brokerId = 1;
private final ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId,
MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), false);
private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
private final ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
private final QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
@Param({"500", "1000", "5000"})
private int topicCount;
@Param({"10", "20", "50"})
private int partitionCount;
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;
@Setup(Level.Trial)
public void setup() {
initializeMetadataCache();
kafkaApis = createKafkaApis();
allTopicMetadataRequest = buildAllTopicMetadataRequest();
}
private void initializeMetadataCache() {
List<UpdateMetadataBroker> liveBrokers = new LinkedList<>();
List<UpdateMetadataPartitionState> partitionStates = new LinkedList<>();
IntStream.range(0, 5).forEach(brokerId -> liveBrokers.add(
new UpdateMetadataBroker().setId(brokerId)
.setEndpoints(endpoints(brokerId))
.setRack("rack1")));
IntStream.range(0, topicCount).forEach(topicId -> {
String topicName = "topic-" + topicId;
IntStream.range(0, partitionCount).forEach(partitionId -> {
partitionStates.add(
new UpdateMetadataPartitionState().setTopicName(topicName)
.setPartitionIndex(partitionId)
.setControllerEpoch(1)
.setLeader(partitionCount % 5)
.setLeaderEpoch(0)
.setIsr(Arrays.asList(0, 1, 3))
.setZkVersion(1)
.setReplicas(Arrays.asList(0, 1, 3)));
});
});
UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(
ApiKeys.UPDATE_METADATA.latestVersion(),
1, 1, 1,
partitionStates, liveBrokers, Collections.emptyMap()).build();
metadataCache.updateMetadata(100, updateMetadataRequest);
}
private List<UpdateMetadataEndpoint> endpoints(final int brokerId) {
return Collections.singletonList(
new UpdateMetadataEndpoint()
.setHost("host_" + brokerId)
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()));
}
private KafkaApis createKafkaApis() {
Properties kafkaProps = new Properties();
kafkaProps.put(ZkConfigs.ZK_CONNECT_CONFIG, "zk");
kafkaProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId + "");
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().
setRequestChannel(requestChannel).
setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient,
Option.empty(), metadataCache, new ZkBrokerEpochManager(metadataCache, kafkaController, Option.empty()))).
setReplicaManager(replicaManager).
setGroupCoordinator(groupCoordinator).
setTxnCoordinator(transactionCoordinator).
setAutoTopicCreationManager(autoTopicCreationManager).
setBrokerId(brokerId).
setConfig(config).
setConfigRepository(new MockConfigRepository()).
setMetadataCache(metadataCache).
setMetrics(metrics).
setAuthorizer(Optional.empty()).
setQuotas(quotaManagers).
setFetchManager(fetchManager).
setSharePartitionManager(Optional.empty()).
setBrokerTopicStats(brokerTopicStats).
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(
ApiMessageType.ListenerType.ZK_BROKER,
false,
() -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
build();
}
@TearDown(Level.Trial)
public void tearDown() {
kafkaApis.close();
metrics.close();
}
private RequestChannel.Request buildAllTopicMetadataRequest() {
MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build();
RequestHeader header = new RequestHeader(metadataRequest.apiKey(), metadataRequest.version(), "", 0);
ByteBuffer bodyBuffer = metadataRequest.serialize();
RequestContext context = new RequestContext(header, "1", null, principal,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty());
}
@Benchmark
public void testMetadataRequestForAllTopics() {
kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest);
}
@Benchmark
public String testRequestToJson() {
Option<com.fasterxml.jackson.databind.JsonNode> option = allTopicMetadataRequest.requestLog();
Optional<com.fasterxml.jackson.databind.JsonNode> optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty();
return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString();
}
@Benchmark
public void testTopicIdInfo() {
metadataCache.topicIdInfo();
}
}

View File

@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;
@ -67,6 +66,8 @@ import java.util.stream.Collectors;
import scala.Option; import scala.Option;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1;
@Warmup(iterations = 5) @Warmup(iterations = 5)
@Measurement(iterations = 5) @Measurement(iterations = 5)
@Fork(3) @Fork(3)
@ -102,7 +103,7 @@ public class CheckpointBench {
public void setup() { public void setup() {
this.scheduler = new KafkaScheduler(1, true, "scheduler-thread"); this.scheduler = new KafkaScheduler(1, true, "scheduler-thread");
this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), 0, null, true, true, 9092, Option.empty(), Option.empty(),
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
(short) 1, false)); (short) 1, false));
this.metrics = new Metrics(); this.metrics = new Metrics();
@ -116,9 +117,7 @@ public class CheckpointBench {
scheduler.startup(); scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
final MetadataCache metadataCache = final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), MetadataCache.kRaftMetadataCache(this.brokerProperties.brokerId(), () -> KRAFT_VERSION_1);
this.brokerProperties.interBrokerProtocolVersion(),
BrokerFeatures.createEmpty(), false);
this.quotaManagers = this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties, QuotaFactory.instantiate(this.brokerProperties,
this.metrics, this.metrics,

View File

@ -28,7 +28,6 @@ import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.metadata.ConfigRepository; import kafka.server.metadata.ConfigRepository;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
@ -36,7 +35,6 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.server.util.Scheduler;
@ -73,6 +71,8 @@ import java.util.stream.Collectors;
import scala.Option; import scala.Option;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1;
@Warmup(iterations = 5) @Warmup(iterations = 5)
@Measurement(iterations = 5) @Measurement(iterations = 5)
@Fork(3) @Fork(3)
@ -82,7 +82,7 @@ public class PartitionCreationBench {
@Param({"false", "true"}) @Param({"false", "true"})
public boolean useTopicIds; public boolean useTopicIds;
@Param({"2000"}) @Param({"20"})
public int numPartitions; public int numPartitions;
private final String topicName = "foo"; private final String topicName = "foo";
@ -95,7 +95,6 @@ public class PartitionCreationBench {
private ReplicaManager replicaManager; private ReplicaManager replicaManager;
private QuotaFactory.QuotaManagers quotaManagers; private QuotaFactory.QuotaManagers quotaManagers;
private KafkaZkClient zkClient;
private LogDirFailureChannel failureChannel; private LogDirFailureChannel failureChannel;
private LogManager logManager; private LogManager logManager;
private AlterPartitionManager alterPartitionManager; private AlterPartitionManager alterPartitionManager;
@ -111,7 +110,7 @@ public class PartitionCreationBench {
this.scheduler = new KafkaScheduler(1, true, "scheduler-thread"); this.scheduler = new KafkaScheduler(1, true, "scheduler-thread");
this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), 0, null, true, true, 9092, Option.empty(), Option.empty(),
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
(short) 1, false)); (short) 1, false));
this.metrics = new Metrics(); this.metrics = new Metrics();
@ -147,25 +146,16 @@ public class PartitionCreationBench {
build(); build();
scheduler.startup(); scheduler.startup();
this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, ""); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, "");
this.zkClient = new KafkaZkClient(null, false, Time.SYSTEM, false) {
@Override
public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
return new Properties();
}
};
this.alterPartitionManager = TestUtils.createAlterIsrManager(); this.alterPartitionManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManagerBuilder(). this.replicaManager = new ReplicaManagerBuilder().
setConfig(brokerProperties). setConfig(brokerProperties).
setMetrics(metrics). setMetrics(metrics).
setTime(time). setTime(time).
setZkClient(zkClient).
setScheduler(scheduler). setScheduler(scheduler).
setLogManager(logManager). setLogManager(logManager).
setQuotaManagers(quotaManagers). setQuotaManagers(quotaManagers).
setBrokerTopicStats(brokerTopicStats). setBrokerTopicStats(brokerTopicStats).
setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), setMetadataCache(MetadataCache.kRaftMetadataCache(this.brokerProperties.brokerId(), () -> KRAFT_VERSION_1)).
this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(),
false)).
setLogDirFailureChannel(failureChannel). setLogDirFailureChannel(failureChannel).
setAlterPartitionManager(alterPartitionManager). setAlterPartitionManager(alterPartitionManager).
build(); build();
@ -183,7 +173,6 @@ public class PartitionCreationBench {
for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) {
Utils.delete(dir); Utils.delete(dir);
} }
this.zkClient.close();
} }
private static LogConfig createLogConfig() { private static LogConfig createLogConfig() {