diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index 3002c579ba9..051d4d7724f 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -17,9 +17,6 @@ package org.apache.kafka.jmh.acl; -import kafka.security.authorizer.AclAuthorizer; -import kafka.security.authorizer.AclAuthorizer.VersionedAcls; - import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; @@ -40,7 +37,6 @@ import org.apache.kafka.metadata.authorizer.StandardAcl; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.security.authorizer.AclEntry; import org.apache.kafka.server.authorizer.Action; -import org.apache.kafka.server.authorizer.Authorizer; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -66,11 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import scala.collection.JavaConverters; @State(Scope.Benchmark) @Fork(value = 1) @@ -80,21 +72,6 @@ import scala.collection.JavaConverters; @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AuthorizerBenchmark { - public enum AuthorizerType { - ACL(AclAuthorizer::new), - KRAFT(StandardAuthorizer::new); - - private final Supplier supplier; - - AuthorizerType(Supplier supplier) { - this.supplier = supplier; - } - - Authorizer newAuthorizer() { - return supplier.get(); - } - } - @Param({"10000", "50000", "200000"}) private int resourceCount; //no. of. rules per resource @@ -104,15 +81,11 @@ public class AuthorizerBenchmark { @Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"}) private double denyPercentage; - @Param({"ACL", "KRAFT"}) - private AuthorizerType authorizerType; - - private final int hostPreCount = 1000; + private final int hostPreCount = 10; private final String resourceNamePrefix = "foo-bar35_resource-"; private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); private final String authorizeByResourceTypeHostName = "127.0.0.2"; - private final HashMap aclToUpdate = new HashMap<>(); - private Authorizer authorizer; + private StandardAuthorizer authorizer; private List actions = new ArrayList<>(); private RequestContext authorizeContext; private RequestContext authorizeByResourceTypeContext; @@ -122,13 +95,12 @@ public class AuthorizerBenchmark { @Setup(Level.Trial) public void setup() throws Exception { - authorizer = authorizerType.newAuthorizer(); + authorizer = new StandardAuthorizer(); prepareAclCache(); - prepareAclToUpdate(); // By adding `-95` to the resource name prefix, we cause the `TreeMap.from/to` call to return // most map entries. In such cases, we rely on the filtering based on `String.startsWith` // to return the matching ACLs. Using a more efficient data structure (e.g. a prefix - // tree) should improve performance significantly). + // tree) should improve performance significantly. actions = Collections.singletonList(new Action(AclOperation.WRITE, new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, PatternType.LITERAL), 1, true, true)); @@ -212,43 +184,16 @@ public class AuthorizerBenchmark { private void setupAcls(Map> aclEntries) { for (Map.Entry> entryMap : aclEntries.entrySet()) { ResourcePattern resourcePattern = entryMap.getKey(); - switch (authorizerType) { - case ACL: - ((AclAuthorizer) authorizer).updateCache(resourcePattern, - new VersionedAcls(JavaConverters.asScalaSetConverter(entryMap.getValue()).asScala().toSet(), 1)); - break; - case KRAFT: - for (AclEntry aclEntry : entryMap.getValue()) { - StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, aclEntry)); - ((StandardAuthorizer) authorizer).addAcl(Uuid.randomUuid(), standardAcl); - - } - ((StandardAuthorizer) authorizer).completeInitialLoad(); - break; + for (AclEntry aclEntry : entryMap.getValue()) { + StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, aclEntry)); + authorizer.addAcl(Uuid.randomUuid(), standardAcl); } - } - } + authorizer.completeInitialLoad(); - private void prepareAclToUpdate() { - scala.collection.mutable.Set entries = new scala.collection.mutable.HashSet<>(); - for (int i = 0; i < resourceCount; i++) { - scala.collection.immutable.Set immutable = new scala.collection.immutable.HashSet<>(); - for (int j = 0; j < aclCount; j++) { - entries.add(new AclEntry(new AccessControlEntry( - principal.toString(), "127.0.0" + j, AclOperation.WRITE, AclPermissionType.ALLOW))); - immutable = entries.toSet(); - } - aclToUpdate.put( - new ResourcePattern(ResourceType.TOPIC, randomResourceName(resourceNamePrefix), PatternType.LITERAL), - new AclAuthorizer.VersionedAcls(immutable, i)); } } - private String randomResourceName(String prefix) { - return prefix + UUID.randomUUID().toString().substring(0, 5); - } - private Boolean shouldDeny() { return rand.nextDouble() * 100.0 - eps < denyPercentage; } @@ -272,13 +217,4 @@ public class AuthorizerBenchmark { public void testAuthorizeByResourceType() { authorizer.authorizeByResourceType(authorizeByResourceTypeContext, AclOperation.READ, ResourceType.TOPIC); } - - @Benchmark - public void testUpdateCache() { - if (authorizerType == AuthorizerType.ACL) { - for (Map.Entry e : aclToUpdate.entrySet()) { - ((AclAuthorizer) authorizer).updateCache(e.getKey(), e.getValue()); - } - } - } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java deleted file mode 100644 index 9a5fadd828e..00000000000 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.jmh.metadata; - -import kafka.controller.KafkaController; -import kafka.coordinator.transaction.TransactionCoordinator; -import kafka.network.RequestChannel; -import kafka.server.AutoTopicCreationManager; -import kafka.server.ClientQuotaManager; -import kafka.server.ClientRequestQuotaManager; -import kafka.server.ControllerMutationQuotaManager; -import kafka.server.FetchManager; -import kafka.server.KafkaApis; -import kafka.server.KafkaConfig; -import kafka.server.MetadataCache; -import kafka.server.QuotaFactory; -import kafka.server.ReplicaManager; -import kafka.server.ReplicationQuotaManager; -import kafka.server.SimpleApiVersionManager; -import kafka.server.ZkAdminManager; -import kafka.server.ZkBrokerEpochManager; -import kafka.server.ZkSupport; -import kafka.server.builders.KafkaApisBuilder; -import kafka.server.metadata.MockConfigRepository; -import kafka.server.metadata.ZkMetadataCache; -import kafka.zk.KafkaZkClient; - -import org.apache.kafka.common.memory.MemoryPool; -import org.apache.kafka.common.message.ApiMessageType; -import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker; -import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint; -import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.network.ClientInformation; -import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.requests.MetadataRequest; -import org.apache.kafka.common.requests.RequestContext; -import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.requests.UpdateMetadataRequest; -import org.apache.kafka.common.security.auth.KafkaPrincipal; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.coordinator.group.GroupCoordinator; -import org.apache.kafka.network.RequestConvertToJson; -import org.apache.kafka.network.metrics.RequestChannelMetrics; -import org.apache.kafka.server.BrokerFeatures; -import org.apache.kafka.server.common.FinalizedFeatures; -import org.apache.kafka.server.common.MetadataVersion; -import org.apache.kafka.server.config.ServerConfigs; -import org.apache.kafka.server.config.ZkConfigs; -import org.apache.kafka.storage.log.metrics.BrokerTopicStats; - -import org.mockito.Mockito; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; - -import scala.Option; - -@State(Scope.Benchmark) -@Fork(value = 1) -@Warmup(iterations = 5) -@Measurement(iterations = 15) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.NANOSECONDS) - -public class MetadataRequestBenchmark { - private final RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly()); - private final RequestChannelMetrics requestChannelMetrics = Mockito.mock(RequestChannelMetrics.class); - private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); - private final GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class); - private final ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class); - private final TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class); - private final KafkaController kafkaController = Mockito.mock(KafkaController.class); - private final AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class); - private final KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class); - private final Metrics metrics = new Metrics(); - private final int brokerId = 1; - private final ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, - MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), false); - private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); - private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); - private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); - private final ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class); - private final QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager, - clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager, - replicaQuotaManager, replicaQuotaManager, Option.empty()); - private final FetchManager fetchManager = Mockito.mock(FetchManager.class); - private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); - private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); - @Param({"500", "1000", "5000"}) - private int topicCount; - @Param({"10", "20", "50"}) - private int partitionCount; - private KafkaApis kafkaApis; - private RequestChannel.Request allTopicMetadataRequest; - - @Setup(Level.Trial) - public void setup() { - initializeMetadataCache(); - kafkaApis = createKafkaApis(); - allTopicMetadataRequest = buildAllTopicMetadataRequest(); - } - - private void initializeMetadataCache() { - List liveBrokers = new LinkedList<>(); - List partitionStates = new LinkedList<>(); - - IntStream.range(0, 5).forEach(brokerId -> liveBrokers.add( - new UpdateMetadataBroker().setId(brokerId) - .setEndpoints(endpoints(brokerId)) - .setRack("rack1"))); - - IntStream.range(0, topicCount).forEach(topicId -> { - String topicName = "topic-" + topicId; - - IntStream.range(0, partitionCount).forEach(partitionId -> { - partitionStates.add( - new UpdateMetadataPartitionState().setTopicName(topicName) - .setPartitionIndex(partitionId) - .setControllerEpoch(1) - .setLeader(partitionCount % 5) - .setLeaderEpoch(0) - .setIsr(Arrays.asList(0, 1, 3)) - .setZkVersion(1) - .setReplicas(Arrays.asList(0, 1, 3))); - }); - }); - - UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder( - ApiKeys.UPDATE_METADATA.latestVersion(), - 1, 1, 1, - partitionStates, liveBrokers, Collections.emptyMap()).build(); - metadataCache.updateMetadata(100, updateMetadataRequest); - } - - private List endpoints(final int brokerId) { - return Collections.singletonList( - new UpdateMetadataEndpoint() - .setHost("host_" + brokerId) - .setPort(9092) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) - .setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value())); - } - - private KafkaApis createKafkaApis() { - Properties kafkaProps = new Properties(); - kafkaProps.put(ZkConfigs.ZK_CONNECT_CONFIG, "zk"); - kafkaProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId + ""); - KafkaConfig config = new KafkaConfig(kafkaProps); - return new KafkaApisBuilder(). - setRequestChannel(requestChannel). - setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient, - Option.empty(), metadataCache, new ZkBrokerEpochManager(metadataCache, kafkaController, Option.empty()))). - setReplicaManager(replicaManager). - setGroupCoordinator(groupCoordinator). - setTxnCoordinator(transactionCoordinator). - setAutoTopicCreationManager(autoTopicCreationManager). - setBrokerId(brokerId). - setConfig(config). - setConfigRepository(new MockConfigRepository()). - setMetadataCache(metadataCache). - setMetrics(metrics). - setAuthorizer(Optional.empty()). - setQuotas(quotaManagers). - setFetchManager(fetchManager). - setSharePartitionManager(Optional.empty()). - setBrokerTopicStats(brokerTopicStats). - setClusterId("clusterId"). - setTime(Time.SYSTEM). - setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager( - ApiMessageType.ListenerType.ZK_BROKER, - false, - () -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))). - build(); - } - - @TearDown(Level.Trial) - public void tearDown() { - kafkaApis.close(); - metrics.close(); - } - - private RequestChannel.Request buildAllTopicMetadataRequest() { - MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build(); - RequestHeader header = new RequestHeader(metadataRequest.apiKey(), metadataRequest.version(), "", 0); - ByteBuffer bodyBuffer = metadataRequest.serialize(); - - RequestContext context = new RequestContext(header, "1", null, principal, - ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), - SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false); - return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty()); - } - - @Benchmark - public void testMetadataRequestForAllTopics() { - kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest); - } - - @Benchmark - public String testRequestToJson() { - Option option = allTopicMetadataRequest.requestLog(); - Optional optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty(); - return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString(); - } - - @Benchmark - public void testTopicIdInfo() { - metadataCache.topicIdInfo(); - } -} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 5de1fd9a459..06d2fe7bade 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.BrokerFeatures; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.util.KafkaScheduler; @@ -67,6 +66,8 @@ import java.util.stream.Collectors; import scala.Option; import scala.collection.JavaConverters; +import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1; + @Warmup(iterations = 5) @Measurement(iterations = 5) @Fork(3) @@ -102,7 +103,7 @@ public class CheckpointBench { public void setup() { this.scheduler = new KafkaScheduler(1, true, "scheduler-thread"); this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( - 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), + 0, null, true, true, 9092, Option.empty(), Option.empty(), Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1, false)); this.metrics = new Metrics(); @@ -116,9 +117,7 @@ public class CheckpointBench { scheduler.startup(); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); final MetadataCache metadataCache = - MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), - this.brokerProperties.interBrokerProtocolVersion(), - BrokerFeatures.createEmpty(), false); + MetadataCache.kRaftMetadataCache(this.brokerProperties.brokerId(), () -> KRAFT_VERSION_1); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index f572c43250c..968fdcb27bf 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -28,7 +28,6 @@ import kafka.server.builders.ReplicaManagerBuilder; import kafka.server.metadata.ConfigRepository; import kafka.server.metadata.MockConfigRepository; import kafka.utils.TestUtils; -import kafka.zk.KafkaZkClient; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -36,7 +35,6 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.BrokerFeatures; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.Scheduler; @@ -73,6 +71,8 @@ import java.util.stream.Collectors; import scala.Option; import scala.collection.JavaConverters; +import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1; + @Warmup(iterations = 5) @Measurement(iterations = 5) @Fork(3) @@ -82,7 +82,7 @@ public class PartitionCreationBench { @Param({"false", "true"}) public boolean useTopicIds; - @Param({"2000"}) + @Param({"20"}) public int numPartitions; private final String topicName = "foo"; @@ -95,7 +95,6 @@ public class PartitionCreationBench { private ReplicaManager replicaManager; private QuotaFactory.QuotaManagers quotaManagers; - private KafkaZkClient zkClient; private LogDirFailureChannel failureChannel; private LogManager logManager; private AlterPartitionManager alterPartitionManager; @@ -111,7 +110,7 @@ public class PartitionCreationBench { this.scheduler = new KafkaScheduler(1, true, "scheduler-thread"); this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( - 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), + 0, null, true, true, 9092, Option.empty(), Option.empty(), Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1, false)); this.metrics = new Metrics(); @@ -147,25 +146,16 @@ public class PartitionCreationBench { build(); scheduler.startup(); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, ""); - this.zkClient = new KafkaZkClient(null, false, Time.SYSTEM, false) { - @Override - public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) { - return new Properties(); - } - }; this.alterPartitionManager = TestUtils.createAlterIsrManager(); this.replicaManager = new ReplicaManagerBuilder(). setConfig(brokerProperties). setMetrics(metrics). setTime(time). - setZkClient(zkClient). setScheduler(scheduler). setLogManager(logManager). setQuotaManagers(quotaManagers). setBrokerTopicStats(brokerTopicStats). - setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), - this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), - false)). + setMetadataCache(MetadataCache.kRaftMetadataCache(this.brokerProperties.brokerId(), () -> KRAFT_VERSION_1)). setLogDirFailureChannel(failureChannel). setAlterPartitionManager(alterPartitionManager). build(); @@ -183,7 +173,6 @@ public class PartitionCreationBench { for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { Utils.delete(dir); } - this.zkClient.close(); } private static LogConfig createLogConfig() {