MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct (#11320)

The ReplicaManager, LogManager, and KafkaApis class all have many
constructor parameters. It is often difficult to add or remove a
parameter, since there are so many locations that need to be updated. In
order to address this problem, we should use named parameters when
constructing these objects from Scala code. This will make it easy to
add new optional parameters without modifying many test cases.  It will
also make it easier to read git diffs and PRs, since the parameters will
have names next to them. Since Java does not support named paramters,
this PR adds several Builder classes which can be used to achieve the
same effect.

ReplicaManager also had a secondary constructor, which this PR removes.
The function of the secondary constructor was just to provide some
default parameters for the main constructor. However, it is simpler just
to actually use default parameters.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2021-09-17 14:12:31 -07:00 committed by GitHub
parent 1b0294dfc4
commit 074a3dacca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1018 additions and 289 deletions

View File

@ -72,6 +72,13 @@
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />
</subpackage> </subpackage>
<subpackage name="server">
<subpackage name="builders">
<allow pkg="kafka" />
<allow pkg="org.apache.kafka" />
</subpackage>
</subpackage>
<subpackage name="test"> <subpackage name="test">
<allow pkg="org.apache.kafka.controller"/> <allow pkg="org.apache.kafka.controller"/>
<allow pkg="org.apache.kafka.metadata"/> <allow pkg="org.apache.kafka.metadata"/>

View File

@ -23,7 +23,7 @@
<!-- core --> <!-- core -->
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)" <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/> files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="ClusterTestExtensions.java"/> <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<!-- Clients --> <!-- Clients -->
<suppress id="dontUseSystemExit" <suppress id="dontUseSystemExit"

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.builders;
import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerTopicStats;
import kafka.server.DelegationTokenManager;
import kafka.server.FetchManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.MetadataSupport;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;
import java.util.Optional;
import scala.compat.java8.OptionConverters;
public class KafkaApisBuilder {
private RequestChannel requestChannel = null;
private MetadataSupport metadataSupport = null;
private ReplicaManager replicaManager = null;
private GroupCoordinator groupCoordinator = null;
private TransactionCoordinator txnCoordinator = null;
private AutoTopicCreationManager autoTopicCreationManager = null;
private int brokerId = 0;
private KafkaConfig config = null;
private ConfigRepository configRepository = null;
private MetadataCache metadataCache = null;
private Metrics metrics = null;
private Optional<Authorizer> authorizer = Optional.empty();
private QuotaManagers quotas = null;
private FetchManager fetchManager = null;
private BrokerTopicStats brokerTopicStats = null;
private String clusterId = "clusterId";
private Time time = Time.SYSTEM;
private DelegationTokenManager tokenManager = null;
private ApiVersionManager apiVersionManager = null;
public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
this.requestChannel = requestChannel;
return this;
}
public KafkaApisBuilder setMetadataSupport(MetadataSupport metadataSupport) {
this.metadataSupport = metadataSupport;
return this;
}
public KafkaApisBuilder setReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager;
return this;
}
public KafkaApisBuilder setGroupCoordinator(GroupCoordinator groupCoordinator) {
this.groupCoordinator = groupCoordinator;
return this;
}
public KafkaApisBuilder setTxnCoordinator(TransactionCoordinator txnCoordinator) {
this.txnCoordinator = txnCoordinator;
return this;
}
public KafkaApisBuilder setAutoTopicCreationManager(AutoTopicCreationManager autoTopicCreationManager) {
this.autoTopicCreationManager = autoTopicCreationManager;
return this;
}
public KafkaApisBuilder setBrokerId(int brokerId) {
this.brokerId = brokerId;
return this;
}
public KafkaApisBuilder setConfig(KafkaConfig config) {
this.config = config;
return this;
}
public KafkaApisBuilder setConfigRepository(ConfigRepository configRepository) {
this.configRepository = configRepository;
return this;
}
public KafkaApisBuilder setMetadataCache(MetadataCache metadataCache) {
this.metadataCache = metadataCache;
return this;
}
public KafkaApisBuilder setMetrics(Metrics metrics) {
this.metrics = metrics;
return this;
}
public KafkaApisBuilder setAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
return this;
}
public KafkaApisBuilder setQuotas(QuotaManagers quotas) {
this.quotas = quotas;
return this;
}
public KafkaApisBuilder setFetchManager(FetchManager fetchManager) {
this.fetchManager = fetchManager;
return this;
}
public KafkaApisBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicStats) {
this.brokerTopicStats = brokerTopicStats;
return this;
}
public KafkaApisBuilder setClusterId(String clusterId) {
this.clusterId = clusterId;
return this;
}
public KafkaApisBuilder setTime(Time time) {
this.time = time;
return this;
}
public KafkaApisBuilder setTokenManager(DelegationTokenManager tokenManager) {
this.tokenManager = tokenManager;
return this;
}
public KafkaApisBuilder setApiVersionManager(ApiVersionManager apiVersionManager) {
this.apiVersionManager = apiVersionManager;
return this;
}
public KafkaApis build() {
if (requestChannel == null) throw new RuntimeException("you must set requestChannel");
if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport");
if (replicaManager == null) throw new RuntimeException("You must set replicaManager");
if (groupCoordinator == null) throw new RuntimeException("You must set groupCoordinator");
if (txnCoordinator == null) throw new RuntimeException("You must set txnCoordinator");
if (autoTopicCreationManager == null)
throw new RuntimeException("You must set autoTopicCreationManager");
if (config == null) config = new KafkaConfig(Collections.emptyMap());
if (configRepository == null) throw new RuntimeException("You must set configRepository");
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (metrics == null) throw new RuntimeException("You must set metrics");
if (quotas == null) throw new RuntimeException("You must set quotas");
if (fetchManager == null) throw new RuntimeException("You must set fetchManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats();
if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager");
return new KafkaApis(requestChannel,
metadataSupport,
replicaManager,
groupCoordinator,
txnCoordinator,
autoTopicCreationManager,
brokerId,
config,
configRepository,
metadataCache,
metrics,
OptionConverters.toScala(authorizer),
quotas,
fetchManager,
brokerTopicStats,
clusterId,
time,
tokenManager,
apiVersionManager);
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.builders;
import kafka.api.ApiVersion;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.metadata.ConfigRepository;
import kafka.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import java.io.File;
import java.util.Collections;
import java.util.List;
import scala.collection.JavaConverters;
public class LogManagerBuilder {
private List<File> logDirs = null;
private List<File> initialOfflineDirs = Collections.emptyList();
private ConfigRepository configRepository = null;
private LogConfig initialDefaultConfig = null;
private CleanerConfig cleanerConfig = null;
private int recoveryThreadsPerDataDir = 1;
private long flushCheckMs = 1000L;
private long flushRecoveryOffsetCheckpointMs = 10000L;
private long flushStartOffsetCheckpointMs = 10000L;
private long retentionCheckMs = 1000L;
private int maxPidExpirationMs = 60000;
private ApiVersion interBrokerProtocolVersion = ApiVersion.latestVersion();
private Scheduler scheduler = null;
private BrokerTopicStats brokerTopicStats = null;
private LogDirFailureChannel logDirFailureChannel = null;
private Time time = Time.SYSTEM;
private boolean keepPartitionMetadataFile = true;
public LogManagerBuilder setLogDirs(List<File> logDirs) {
this.logDirs = logDirs;
return this;
}
public LogManagerBuilder setInitialOfflineDirs(List<File> initialOfflineDirs) {
this.initialOfflineDirs = initialOfflineDirs;
return this;
}
public LogManagerBuilder setConfigRepository(ConfigRepository configRepository) {
this.configRepository = configRepository;
return this;
}
public LogManagerBuilder setInitialDefaultConfig(LogConfig initialDefaultConfig) {
this.initialDefaultConfig = initialDefaultConfig;
return this;
}
public LogManagerBuilder setCleanerConfig(CleanerConfig cleanerConfig) {
this.cleanerConfig = cleanerConfig;
return this;
}
public LogManagerBuilder setRecoveryThreadsPerDataDir(int recoveryThreadsPerDataDir) {
this.recoveryThreadsPerDataDir = recoveryThreadsPerDataDir;
return this;
}
public LogManagerBuilder setFlushCheckMs(long flushCheckMs) {
this.flushCheckMs = flushCheckMs;
return this;
}
public LogManagerBuilder setFlushRecoveryOffsetCheckpointMs(long flushRecoveryOffsetCheckpointMs) {
this.flushRecoveryOffsetCheckpointMs = flushRecoveryOffsetCheckpointMs;
return this;
}
public LogManagerBuilder setFlushStartOffsetCheckpointMs(long flushStartOffsetCheckpointMs) {
this.flushStartOffsetCheckpointMs = flushStartOffsetCheckpointMs;
return this;
}
public LogManagerBuilder setRetentionCheckMs(long retentionCheckMs) {
this.retentionCheckMs = retentionCheckMs;
return this;
}
public LogManagerBuilder setMaxPidExpirationMs(int maxPidExpirationMs) {
this.maxPidExpirationMs = maxPidExpirationMs;
return this;
}
public LogManagerBuilder setInterBrokerProtocolVersion(ApiVersion interBrokerProtocolVersion) {
this.interBrokerProtocolVersion = interBrokerProtocolVersion;
return this;
}
public LogManagerBuilder setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
return this;
}
public LogManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicStats) {
this.brokerTopicStats = brokerTopicStats;
return this;
}
public LogManagerBuilder setLogDirFailureChannel(LogDirFailureChannel logDirFailureChannel) {
this.logDirFailureChannel = logDirFailureChannel;
return this;
}
public LogManagerBuilder setTime(Time time) {
this.time = time;
return this;
}
public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetadataFile) {
this.keepPartitionMetadataFile = keepPartitionMetadataFile;
return this;
}
public LogManager build() {
if (logDirs == null) throw new RuntimeException("you must set logDirs");
if (configRepository == null) throw new RuntimeException("you must set configRepository");
if (initialDefaultConfig == null) throw new RuntimeException("you must set initialDefaultConfig");
if (cleanerConfig == null) throw new RuntimeException("you must set cleanerConfig");
if (scheduler == null) throw new RuntimeException("you must set scheduler");
if (brokerTopicStats == null) throw new RuntimeException("you must set brokerTopicStats");
if (logDirFailureChannel == null) throw new RuntimeException("you must set logDirFailureChannel");
return new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
JavaConverters.asScalaIteratorConverter(initialOfflineDirs.iterator()).asScala().toSeq(),
configRepository,
initialDefaultConfig,
cleanerConfig,
recoveryThreadsPerDataDir,
flushCheckMs,
flushRecoveryOffsetCheckpointMs,
flushStartOffsetCheckpointMs,
retentionCheckMs,
maxPidExpirationMs,
interBrokerProtocolVersion,
scheduler,
brokerTopicStats,
logDirFailureChannel,
time,
keepPartitionMetadataFile);
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.builders;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.utils.Scheduler;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import scala.compat.java8.OptionConverters;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReplicaManagerBuilder {
private KafkaConfig config = null;
private Metrics metrics = null;
private Time time = Time.SYSTEM;
private Scheduler scheduler = null;
private LogManager logManager = null;
private QuotaManagers quotaManagers = null;
private MetadataCache metadataCache = null;
private LogDirFailureChannel logDirFailureChannel = null;
private AlterIsrManager alterIsrManager = null;
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional<KafkaZkClient> zkClient = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty();
public ReplicaManagerBuilder setConfig(KafkaConfig config) {
this.config = config;
return this;
}
public ReplicaManagerBuilder setMetrics(Metrics metrics) {
this.metrics = metrics;
return this;
}
public ReplicaManagerBuilder setTime(Time time) {
this.time = time;
return this;
}
public ReplicaManagerBuilder setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
return this;
}
public ReplicaManagerBuilder setLogManager(LogManager logManager) {
this.logManager = logManager;
return this;
}
public ReplicaManagerBuilder setQuotaManagers(QuotaManagers quotaManagers) {
this.quotaManagers = quotaManagers;
return this;
}
public ReplicaManagerBuilder setMetadataCache(MetadataCache metadataCache) {
this.metadataCache = metadataCache;
return this;
}
public ReplicaManagerBuilder setLogDirFailureChannel(LogDirFailureChannel logDirFailureChannel) {
this.logDirFailureChannel = logDirFailureChannel;
return this;
}
public ReplicaManagerBuilder setAlterIsrManager(AlterIsrManager alterIsrManager) {
this.alterIsrManager = alterIsrManager;
return this;
}
public ReplicaManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicStats) {
this.brokerTopicStats = brokerTopicStats;
return this;
}
public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean isShuttingDown) {
this.isShuttingDown = isShuttingDown;
return this;
}
public ReplicaManagerBuilder setZkClient(KafkaZkClient zkClient) {
this.zkClient = Optional.of(zkClient);
return this;
}
public ReplicaManagerBuilder setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> delayedProducePurgatory) {
this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
return this;
}
public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> delayedFetchPurgatory) {
this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory);
return this;
}
public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords> delayedDeleteRecordsPurgatory) {
this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory);
return this;
}
public ReplicaManagerBuilder setDelayedElectLeaderPurgatoryParam(DelayedOperationPurgatory<DelayedElectLeader> delayedElectLeaderPurgatory) {
this.delayedElectLeaderPurgatory = Optional.of(delayedElectLeaderPurgatory);
return this;
}
public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = Optional.of(threadNamePrefix);
return this;
}
public ReplicaManager build() {
if (config == null) config = new KafkaConfig(Collections.emptyMap());
if (metrics == null) metrics = new Metrics();
if (logManager == null) throw new RuntimeException("You must set logManager");
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");
if (alterIsrManager == null) throw new RuntimeException("You must set alterIsrManager");
return new ReplicaManager(config,
metrics,
time,
scheduler,
logManager,
quotaManagers,
metadataCache,
logDirFailureChannel,
alterIsrManager,
brokerTopicStats,
isShuttingDown,
OptionConverters.toScala(zkClient),
OptionConverters.toScala(delayedProducePurgatory),
OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(threadNamePrefix));
}
}

View File

@ -257,10 +257,20 @@ class BrokerServer(
) )
alterIsrManager.start() alterIsrManager.start()
this._replicaManager = new ReplicaManager(config, metrics, time, None, this._replicaManager = new ReplicaManager(
kafkaScheduler, logManager, isShuttingDown, quotaManagers, config = config,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, metrics = metrics,
threadNamePrefix) time = time,
scheduler = kafkaScheduler,
logManager = logManager,
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterIsrManager = alterIsrManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = None,
threadNamePrefix = threadNamePrefix)
/* start token manager */ /* start token manager */
if (config.tokenAuthEnabled) { if (config.tokenAuthEnabled) {
@ -368,10 +378,26 @@ class BrokerServer(
// Create the request processor objects. // Create the request processor objects.
val raftSupport = RaftSupport(forwardingManager, metadataCache) val raftSupport = RaftSupport(forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, dataPlaneRequestProcessor = new KafkaApis(
replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, requestChannel = socketServer.dataPlaneRequestChannel,
config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers, metadataSupport = raftSupport,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,
config = config,
configRepository = metadataCache,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
quotas = quotaManagers,
fetchManager = fetchManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,

View File

@ -21,6 +21,7 @@ import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException} import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1} import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.{Broker, EndPoint} import kafka.cluster.{Broker, EndPoint}
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException} import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
@ -29,7 +30,7 @@ import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.network.SocketServer import kafka.network.{RequestChannel, SocketServer}
import kafka.security.CredentialProvider import kafka.security.CredentialProvider
import kafka.server.metadata.ZkConfigRepository import kafka.server.metadata.ZkConfigRepository
import kafka.utils._ import kafka.utils._
@ -386,18 +387,35 @@ class KafkaServer(
/* start processing requests */ /* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache) val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers, def createKafkaApis(requestChannel: RequestChannel): KafkaApis = new KafkaApis(
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) requestChannel = requestChannel,
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,
config = config,
configRepository = configRepository,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
quotas = quotaManagers,
fetchManager = fetchManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager)
dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator, controlPlaneRequestProcessor = createKafkaApis(controlPlaneRequestChannel)
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix) 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
} }
@ -438,8 +456,20 @@ class KafkaServer(
} }
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, Some(zkClient), kafkaScheduler, logManager, isShuttingDown, quotaManagers, new ReplicaManager(
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager) metrics = metrics,
config = config,
time = time,
scheduler = kafkaScheduler,
logManager = logManager,
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterIsrManager = alterIsrManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = Some(zkClient),
threadNamePrefix = threadNamePrefix)
} }
private def initZkClient(time: Time): Unit = { private def initZkClient(time: Time): Unit = {

View File

@ -186,49 +186,37 @@ object ReplicaManager {
class ReplicaManager(val config: KafkaConfig, class ReplicaManager(val config: KafkaConfig,
metrics: Metrics, metrics: Metrics,
time: Time, time: Time,
val zkClient: Option[KafkaZkClient],
scheduler: Scheduler, scheduler: Scheduler,
val logManager: LogManager, val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
quotaManagers: QuotaManagers, quotaManagers: QuotaManagers,
val brokerTopicStats: BrokerTopicStats,
val metadataCache: MetadataCache, val metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel, logDirFailureChannel: LogDirFailureChannel,
val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], val alterIsrManager: AlterIsrManager,
val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], val brokerTopicStats: BrokerTopicStats = new BrokerTopicStats(),
val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], val isShuttingDown: AtomicBoolean = new AtomicBoolean(false),
val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader], val zkClient: Option[KafkaZkClient] = None,
threadNamePrefix: Option[String], delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None,
val alterIsrManager: AlterIsrManager) extends Logging with KafkaMetricsGroup { delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None,
delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
threadNamePrefix: Option[String] = None,
) extends Logging with KafkaMetricsGroup {
def this(config: KafkaConfig, val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
metrics: Metrics, DelayedOperationPurgatory[DelayedProduce](
time: Time, purgatoryName = "Produce", brokerId = config.brokerId,
zkClient: Option[KafkaZkClient], purgeInterval = config.producerPurgatoryPurgeIntervalRequests))
scheduler: Scheduler, val delayedFetchPurgatory = delayedFetchPurgatoryParam.getOrElse(
logManager: LogManager, DelayedOperationPurgatory[DelayedFetch](
isShuttingDown: AtomicBoolean, purgatoryName = "Fetch", brokerId = config.brokerId,
quotaManagers: QuotaManagers, purgeInterval = config.fetchPurgatoryPurgeIntervalRequests))
brokerTopicStats: BrokerTopicStats, val delayedDeleteRecordsPurgatory = delayedDeleteRecordsPurgatoryParam.getOrElse(
metadataCache: MetadataCache, DelayedOperationPurgatory[DelayedDeleteRecords](
logDirFailureChannel: LogDirFailureChannel, purgatoryName = "DeleteRecords", brokerId = config.brokerId,
alterIsrManager: AlterIsrManager, purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests))
threadNamePrefix: Option[String] = None) = { val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown, DelayedOperationPurgatory[DelayedElectLeader](
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, purgatoryName = "ElectLeader", brokerId = config.brokerId))
DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", brokerId = config.brokerId,
purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", brokerId = config.brokerId,
purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", brokerId = config.brokerId,
purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", brokerId = config.brokerId),
threadNamePrefix, alterIsrManager)
}
/* epoch of the controller that last changed the leader */ /* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch

View File

@ -40,7 +40,7 @@ import scala.jdk.CollectionConverters._
class LogLoaderTest { class LogLoaderTest {
var config: KafkaConfig = null var config: KafkaConfig = null
val brokerTopicStats = new BrokerTopicStats val brokerTopicStats = new BrokerTopicStats()
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000 val maxProducerIdExpirationMs: Int = 60 * 60 * 1000
val tmpDir = TestUtils.tempDir() val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir) val logDir = TestUtils.randomPartitionLogDir(tmpDir)
@ -75,13 +75,25 @@ class LogLoaderTest {
// Create a LogManager with some overridden methods to facilitate interception of clean shutdown // Create a LogManager with some overridden methods to facilitate interception of clean shutdown
// flag and to inject a runtime error // flag and to inject a runtime error
def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = { def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager =
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new MockConfigRepository(), new LogManager(
initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4, logDirs = logDirs.map(_.getAbsoluteFile),
flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L, initialOfflineDirs = Array.empty[File],
retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, configRepository = new MockConfigRepository(),
brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), initialDefaultConfig = logConfig,
keepPartitionMetadataFile = config.usesTopicId, interBrokerProtocolVersion = config.interBrokerProtocolVersion) { cleanerConfig = CleanerConfig(enableCleaner = false),
recoveryThreadsPerDataDir = 4,
flushCheckMs = 1000L,
flushRecoveryOffsetCheckpointMs = 10000L,
flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L,
maxPidExpirationMs = 60 * 60 * 1000,
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
scheduler = time.scheduler,
brokerTopicStats = new BrokerTopicStats(),
logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
time = time,
keepPartitionMetadataFile = config.usesTopicId) {
override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig, logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,
@ -111,7 +123,6 @@ class LogLoaderTest {
producerStateManager, None, true) producerStateManager, None, true)
} }
} }
}
val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile) val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile)
locally { locally {

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api._ import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.server.metadata.MockConfigRepository import kafka.server.metadata.MockConfigRepository
@ -63,9 +62,16 @@ class HighwatermarkPersistenceTest {
val time = new MockTime val time = new MockTime
val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
// create replica manager // create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, None, scheduler, val replicaManager = new ReplicaManager(
logManagers.head, new AtomicBoolean(false), quotaManager, metrics = metrics,
new BrokerTopicStats, MetadataCache.zkMetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager) config = configs.head,
time = time,
scheduler = scheduler,
logManager = logManagers.head,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId),
logDirFailureChannel = logDirFailureChannels.head,
alterIsrManager = alterIsrManager)
replicaManager.startup() replicaManager.startup()
try { try {
replicaManager.checkpointHighWatermarks() replicaManager.checkpointHighWatermarks()
@ -112,9 +118,16 @@ class HighwatermarkPersistenceTest {
val time = new MockTime val time = new MockTime
val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
// create replica manager // create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, None, val replicaManager = new ReplicaManager(
scheduler, logManagers.head, new AtomicBoolean(false), quotaManager, metrics = metrics,
new BrokerTopicStats, MetadataCache.zkMetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager) config = configs.head,
time = time,
scheduler = scheduler,
logManager = logManagers.head,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId),
logDirFailureChannel = logDirFailureChannels.head,
alterIsrManager = alterIsrManager)
replicaManager.startup() replicaManager.startup()
try { try {
replicaManager.checkpointHighWatermarks() replicaManager.checkpointHighWatermarks()

View File

@ -18,7 +18,6 @@ package kafka.server
import java.io.File import java.io.File
import java.util.Properties import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.{UnifiedLog, LogManager} import kafka.log.{UnifiedLog, LogManager}
@ -65,9 +64,16 @@ class IsrExpirationTest {
alterIsrManager = TestUtils.createAlterIsrManager() alterIsrManager = TestUtils.createAlterIsrManager()
quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
replicaManager = new ReplicaManager(configs.head, metrics, time, None, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(
quotaManager, new BrokerTopicStats, MetadataCache.zkMetadataCache(configs.head.brokerId), metrics = metrics,
new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager) config = configs.head,
time = time,
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId),
logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size),
alterIsrManager = alterIsrManager)
} }
@AfterEach @AfterEach

View File

@ -171,25 +171,26 @@ class KafkaApisTest {
} }
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis) val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis)
new KafkaApis(requestChannel, new KafkaApis(
metadataSupport, metadataSupport = metadataSupport,
replicaManager, requestChannel = requestChannel,
groupCoordinator, replicaManager = replicaManager,
txnCoordinator, groupCoordinator = groupCoordinator,
autoTopicCreationManager, txnCoordinator = txnCoordinator,
brokerId, autoTopicCreationManager = autoTopicCreationManager,
config, brokerId = brokerId,
configRepository, config = config,
metadataCache, configRepository = configRepository,
metrics, metadataCache = metadataCache,
authorizer, metrics = metrics,
quotas, authorizer = authorizer,
fetchManager, quotas = quotas,
brokerTopicStats, fetchManager = fetchManager,
clusterId, brokerTopicStats = brokerTopicStats,
time, clusterId = clusterId,
null, time = time,
apiVersionManager) tokenManager = null,
apiVersionManager = apiVersionManager)
} }
@Test @Test

View File

@ -18,7 +18,6 @@ package kafka.server
import java.io.File import java.io.File
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, Properties}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.{UnifiedLog, LogManager, LogOffsetSnapshot} import kafka.log.{UnifiedLog, LogManager, LogOffsetSnapshot}
@ -249,9 +248,16 @@ class ReplicaManagerQuotasTest {
val leaderBrokerId = configs.head.brokerId val leaderBrokerId = configs.head.brokerId
quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
replicaManager = new ReplicaManager(configs.head, metrics, time, None, scheduler, logManager, replicaManager = new ReplicaManager(
new AtomicBoolean(false), quotaManager, metrics = metrics,
new BrokerTopicStats, MetadataCache.zkMetadataCache(leaderBrokerId), new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager) config = configs.head,
time = time,
scheduler = scheduler,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(leaderBrokerId),
logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size),
alterIsrManager = alterIsrManager)
//create the two replicas //create the two replicas
for ((p, _) <- fetchInfo) { for ((p, _) <- fetchInfo) {

View File

@ -21,7 +21,7 @@ import java.io.File
import java.net.InetAddress import java.net.InetAddress
import java.nio.file.Files import java.nio.file.Files
import java.util import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.stream.IntStream import java.util.stream.IntStream
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, Properties}
@ -101,9 +101,16 @@ class ReplicaManagerTest {
@Test @Test
def testHighWaterMarkDirectoryMapping(): Unit = { def testHighWaterMarkDirectoryMapping(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager)
try { try {
val partition = rm.createPartition(new TopicPartition(topic, 1)) val partition = rm.createPartition(new TopicPartition(topic, 1))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@ -121,9 +128,16 @@ class ReplicaManagerTest {
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager)
try { try {
val partition = rm.createPartition(new TopicPartition(topic, 1)) val partition = rm.createPartition(new TopicPartition(topic, 1))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@ -138,9 +152,17 @@ class ReplicaManagerTest {
@Test @Test
def testIllegalRequiredAcks(): Unit = { def testIllegalRequiredAcks(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, Option(this.getClass.getName)) config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager,
threadNamePrefix = Option(this.getClass.getName))
try { try {
def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS) assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@ -185,9 +207,16 @@ class ReplicaManagerTest {
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
val metadataCache: MetadataCache = Mockito.mock(classOf[MetadataCache]) val metadataCache: MetadataCache = Mockito.mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, metrics = metrics,
metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager) config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = metadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager)
try { try {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
@ -1797,11 +1826,22 @@ class ReplicaManagerTest {
.setLeaderEpoch(leaderEpochFromLeader) .setLeaderEpoch(leaderEpochFromLeader)
.setEndOffset(offsetFromLeader)).asJava, .setEndOffset(offsetFromLeader)).asJava,
BrokerEndPoint(1, "host1" ,1), time) BrokerEndPoint(1, "host1" ,1), time)
val replicaManager = new ReplicaManager(config, metrics, time, None, mockScheduler, mockLogMgr, val replicaManager = new ReplicaManager(
new AtomicBoolean(false), quotaManager, mockBrokerTopicStats, metrics = metrics,
metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory, config = config,
mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, Option(this.getClass.getName), time = time,
alterIsrManager) { scheduler = mockScheduler,
logManager = mockLogMgr,
quotaManagers = quotaManager,
brokerTopicStats = mockBrokerTopicStats,
metadataCache = metadataCache,
logDirFailureChannel = mockLogDirFailureChannel,
alterIsrManager = alterIsrManager,
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
threadNamePrefix = Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager(metrics: Metrics, override protected def createReplicaFetcherManager(metrics: Metrics,
time: Time, time: Time,
@ -1979,11 +2019,21 @@ class ReplicaManagerTest {
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false) purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
new ReplicaManager(config, metrics, time, None, scheduler, mockLogMgr, new ReplicaManager(
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, metrics = metrics,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory, config = config,
mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName), time = time,
alterIsrManager) { scheduler = scheduler,
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = metadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager,
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
threadNamePrefix = Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager( override protected def createReplicaFetcherManager(
metrics: Metrics, metrics: Metrics,
@ -2207,12 +2257,28 @@ class ReplicaManagerTest {
mockGetAliveBrokerFunctions(metadataCache1, aliveBrokers) mockGetAliveBrokerFunctions(metadataCache1, aliveBrokers)
// each replica manager is for a broker // each replica manager is for a broker
val rm0 = new ReplicaManager(config0, metrics, time, None, new MockScheduler(time), mockLogMgr0, val rm0 = new ReplicaManager(
new AtomicBoolean(false), quotaManager, metrics = metrics,
brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager) config = config0,
val rm1 = new ReplicaManager(config1, metrics, time, None, new MockScheduler(time), mockLogMgr1, time = time,
new AtomicBoolean(false), quotaManager, scheduler = new MockScheduler(time),
brokerTopicStats2, metadataCache1, new LogDirFailureChannel(config1.logDirs.size), alterIsrManager) logManager = mockLogMgr0,
quotaManagers = quotaManager,
brokerTopicStats = brokerTopicStats1,
metadataCache = metadataCache0,
logDirFailureChannel = new LogDirFailureChannel(config0.logDirs.size),
alterIsrManager = alterIsrManager)
val rm1 = new ReplicaManager(
metrics = metrics,
config = config1,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr1,
quotaManagers = quotaManager,
brokerTopicStats = brokerTopicStats2,
metadataCache = metadataCache1,
logDirFailureChannel = new LogDirFailureChannel(config1.logDirs.size),
alterIsrManager = alterIsrManager)
(rm0, rm1) (rm0, rm1)
} }
@ -2453,9 +2519,16 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr, new ReplicaManager(
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) { config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager) {
override def getPartitionOrException(topicPartition: TopicPartition): Partition = { override def getPartitionOrException(topicPartition: TopicPartition): Partition = {
throw Errors.NOT_LEADER_OR_FOLLOWER.exception() throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
} }

View File

@ -17,7 +17,6 @@
package kafka.server.epoch package kafka.server.epoch
import java.io.File import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import kafka.log.{UnifiedLog, LogManager} import kafka.log.{UnifiedLog, LogManager}
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
@ -65,9 +64,16 @@ class OffsetsForLeaderEpochTest {
replay(mockLog, logManager) replay(mockLog, logManager)
// create a replica manager with 1 partition that has 1 replica // create a replica manager with 1 partition that has 1 replica
replicaManager = new ReplicaManager(config, metrics, time, None, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(
quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) config = config,
time = time,
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager)
val partition = replicaManager.createPartition(tp) val partition = replicaManager.createPartition(tp)
partition.setLog(mockLog, isFutureLog = false) partition.setLog(mockLog, isFutureLog = false)
partition.leaderReplicaIdOpt = Some(config.brokerId) partition.leaderReplicaIdOpt = Some(config.brokerId)
@ -88,9 +94,16 @@ class OffsetsForLeaderEpochTest {
replay(logManager) replay(logManager)
//create a replica manager with 1 partition that has 0 replica //create a replica manager with 1 partition that has 0 replica
replicaManager = new ReplicaManager(config, metrics, time, None, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(
quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) config = config,
time = time,
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager)
replicaManager.createPartition(tp) replicaManager.createPartition(tp)
//Given //Given
@ -113,9 +126,16 @@ class OffsetsForLeaderEpochTest {
replay(logManager) replay(logManager)
//create a replica manager with 0 partition //create a replica manager with 0 partition
replicaManager = new ReplicaManager(config, metrics, time, None, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(
quotaManager, new BrokerTopicStats, metrics = metrics,
MetadataCache.zkMetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) config = config,
time = time,
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterIsrManager = alterIsrManager)
//Given //Given
val epochRequested: Integer = 5 val epochRequested: Integer = 5

View File

@ -42,6 +42,8 @@ import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota; import kafka.server.ReplicaQuota;
import kafka.server.ZkMetadataCache; import kafka.server.ZkMetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
@ -80,10 +82,6 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Map;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -96,7 +94,9 @@ import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import scala.Option;
import scala.collection.Iterator;
import scala.collection.Map;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -129,26 +129,28 @@ public class ReplicaFetcherThreadBenchmark {
KafkaConfig config = new KafkaConfig(props); KafkaConfig config = new KafkaConfig(props);
LogConfig logConfig = createLogConfig(); LogConfig logConfig = createLogConfig();
List<File> logDirs = Collections.singletonList(logDir);
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class); LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), List<File> logDirs = Collections.singletonList(logDir);
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(), logManager = new LogManagerBuilder().
new MockConfigRepository(), setLogDirs(logDirs).
logConfig, setInitialOfflineDirs(Collections.emptyList()).
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), setConfigRepository(new MockConfigRepository()).
1, setInitialDefaultConfig(logConfig).
1000L, setCleanerConfig(new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5")).
10000L, setRecoveryThreadsPerDataDir(1).
10000L, setFlushCheckMs(1000L).
1000L, setFlushRecoveryOffsetCheckpointMs(10000L).
60000, setFlushStartOffsetCheckpointMs(10000L).
ApiVersion.latestVersion(), setRetentionCheckMs(1000L).
scheduler, setMaxPidExpirationMs(60000).
brokerTopicStats, setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
logDirFailureChannel, setScheduler(scheduler).
Time.SYSTEM, setBrokerTopicStats(brokerTopicStats).
true); setLogDirFailureChannel(logDirFailureChannel).
setTime(Time.SYSTEM).
setKeepPartitionMetadataFile(true).
build();
LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> initialFetched = new LinkedHashMap<>(); LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> initialFetched = new LinkedHashMap<>();
HashMap<String, Uuid> topicIds = new HashMap<>(); HashMap<String, Uuid> topicIds = new HashMap<>();
@ -213,8 +215,19 @@ public class ReplicaFetcherThreadBenchmark {
ZkMetadataCache metadataCache = new ZkMetadataCache(0); ZkMetadataCache metadataCache = new ZkMetadataCache(0);
metadataCache.updateMetadata(0, updateMetadataRequest); metadataCache.updateMetadata(0, updateMetadataRequest);
replicaManager = new ReplicaManager(config, metrics, new MockTime(), Option.apply(Mockito.mock(KafkaZkClient.class)), scheduler, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManagerBuilder().
Mockito.mock(QuotaFactory.QuotaManagers.class), brokerTopicStats, metadataCache, new LogDirFailureChannel(logDirs.size()), TestUtils.createAlterIsrManager(), Option.empty()); setConfig(config).
setMetrics(metrics).
setTime(new MockTime()).
setZkClient(Mockito.mock(KafkaZkClient.class)).
setScheduler(scheduler).
setLogManager(logManager).
setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
setBrokerTopicStats(brokerTopicStats).
setMetadataCache(metadataCache).
setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
setAlterIsrManager(TestUtils.createAlterIsrManager()).
build();
fetcher = new ReplicaFetcherBenchThread(config, replicaManager, pool); fetcher = new ReplicaFetcherBenchThread(config, replicaManager, pool);
fetcher.addPartitions(initialFetchStates); fetcher.addPartitions(initialFetchStates);
// force a pass to move partitions to fetching state. We do this in the setup phase // force a pass to move partitions to fetching state. We do this in the setup phase

View File

@ -39,6 +39,7 @@ import kafka.server.ReplicationQuotaManager;
import kafka.server.SimpleApiVersionManager; import kafka.server.SimpleApiVersionManager;
import kafka.server.ZkAdminManager; import kafka.server.ZkAdminManager;
import kafka.server.ZkSupport; import kafka.server.ZkSupport;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
import kafka.zk.KafkaZkClient; import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.memory.MemoryPool;
@ -56,7 +57,7 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.UpdateMetadataRequest; import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
@ -78,6 +79,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -173,25 +175,27 @@ public class MetadataRequestBenchmark {
kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk"); kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk");
kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + ""); kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
KafkaConfig config = new KafkaConfig(kafkaProps); KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApis(requestChannel, return new KafkaApisBuilder().
new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache), setRequestChannel(requestChannel).
replicaManager, setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache)).
groupCoordinator, setReplicaManager(replicaManager).
transactionCoordinator, setGroupCoordinator(groupCoordinator).
autoTopicCreationManager, setTxnCoordinator(transactionCoordinator).
brokerId, setAutoTopicCreationManager(autoTopicCreationManager).
config, setBrokerId(brokerId).
new MockConfigRepository(), setConfig(config).
metadataCache, setConfigRepository(new MockConfigRepository()).
metrics, setMetadataCache(metadataCache).
Option.empty(), setMetrics(metrics).
quotaManagers, setAuthorizer(Optional.empty()).
fetchManager, setQuotas(quotaManagers).
brokerTopicStats, setFetchManager(fetchManager).
"clusterId", setBrokerTopicStats(brokerTopicStats).
new SystemTime(), setClusterId("clusterId").
null, setTime(Time.SYSTEM).
new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER)); setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER)).
build();
} }
@TearDown(Level.Trial) @TearDown(Level.Trial)

View File

@ -30,6 +30,7 @@ import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats; import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel; import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
@ -54,14 +55,10 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import scala.collection.JavaConverters;
import scala.compat.java8.OptionConverters;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -71,6 +68,8 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.compat.java8.OptionConverters;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -98,26 +97,26 @@ public class PartitionMakeFollowerBenchmark {
scheduler.startup(); scheduler.startup();
LogConfig logConfig = createLogConfig(); LogConfig logConfig = createLogConfig();
List<File> logDirs = Collections.singletonList(logDir);
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class); LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), logManager = new LogManagerBuilder().
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(), setLogDirs(Collections.singletonList(logDir)).
new MockConfigRepository(), setInitialOfflineDirs(Collections.emptyList()).
logConfig, setConfigRepository(new MockConfigRepository()).
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), setInitialDefaultConfig(logConfig).
1, setCleanerConfig(new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5")).
1000L, setRecoveryThreadsPerDataDir(1).
10000L, setFlushCheckMs(1000L).
10000L, setFlushRecoveryOffsetCheckpointMs(10000L).
1000L, setFlushStartOffsetCheckpointMs(10000L).
60000, setRetentionCheckMs(1000L).
ApiVersion.latestVersion(), setMaxPidExpirationMs(60000).
scheduler, setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
brokerTopicStats, setScheduler(scheduler).
logDirFailureChannel, setBrokerTopicStats(brokerTopicStats).
Time.SYSTEM, setLogDirFailureChannel(logDirFailureChannel).
true); setTime(Time.SYSTEM).setKeepPartitionMetadataFile(true).
build();
TopicPartition tp = new TopicPartition("topic", 0); TopicPartition tp = new TopicPartition("topic", 0);
topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid())); topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid()));

View File

@ -31,6 +31,7 @@ import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel; import kafka.server.LogDirFailureChannel;
import kafka.server.LogOffsetMetadata; import kafka.server.LogOffsetMetadata;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
@ -51,9 +52,6 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import scala.collection.JavaConverters;
import scala.compat.java8.OptionConverters;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
@ -63,6 +61,8 @@ import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.compat.java8.OptionConverters;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -85,24 +85,25 @@ public class UpdateFollowerFetchStateBenchmark {
public void setUp() { public void setUp() {
scheduler.startup(); scheduler.startup();
LogConfig logConfig = createLogConfig(); LogConfig logConfig = createLogConfig();
List<File> logDirs = Collections.singletonList(logDir); logManager = new LogManagerBuilder().
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), setLogDirs(Collections.singletonList(logDir)).
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(), setInitialOfflineDirs(Collections.emptyList()).
new MockConfigRepository(), setConfigRepository(new MockConfigRepository()).
logConfig, setInitialDefaultConfig(logConfig).
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), setCleanerConfig(new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5")).
1, setRecoveryThreadsPerDataDir(1).
1000L, setFlushCheckMs(1000L).
10000L, setFlushRecoveryOffsetCheckpointMs(10000L).
10000L, setFlushStartOffsetCheckpointMs(10000L).
1000L, setRetentionCheckMs(1000L).
60000, setMaxPidExpirationMs(60000).
ApiVersion.latestVersion(), setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
scheduler, setScheduler(scheduler).
brokerTopicStats, setBrokerTopicStats(brokerTopicStats).
logDirFailureChannel, setLogDirFailureChannel(logDirFailureChannel).
Time.SYSTEM, setTime(Time.SYSTEM).
true); setKeepPartitionMetadataFile(true).
build();
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Option.apply(0L)); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Option.apply(0L));
DelayedOperations delayedOperations = new DelayedOperationsMock(); DelayedOperations delayedOperations = new DelayedOperationsMock();

View File

@ -28,6 +28,7 @@ import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.QuotaFactory; import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
@ -55,7 +56,6 @@ import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
@ -118,20 +118,18 @@ public class CheckpointBench {
this.time, ""); this.time, "");
this.alterIsrManager = TestUtils.createAlterIsrManager(); this.alterIsrManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManager( this.replicaManager = new ReplicaManagerBuilder().
this.brokerProperties, setConfig(brokerProperties).
this.metrics, setMetrics(metrics).
this.time, setTime(time).
Option.empty(), setScheduler(scheduler).
this.scheduler, setLogManager(logManager).
this.logManager, setQuotaManagers(quotaManagers).
new AtomicBoolean(false), setBrokerTopicStats(brokerTopicStats).
this.quotaManagers, setMetadataCache(metadataCache).
brokerTopicStats, setLogDirFailureChannel(failureChannel).
metadataCache, setAlterIsrManager(alterIsrManager).
this.failureChannel, build();
alterIsrManager,
Option.empty());
replicaManager.startup(); replicaManager.startup();
List<TopicPartition> topicPartitions = new ArrayList<>(); List<TopicPartition> topicPartitions = new ArrayList<>();

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.kafka.jmh.server; package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.api.ApiVersion; import kafka.api.ApiVersion;
import kafka.cluster.Partition; import kafka.cluster.Partition;
import kafka.log.CleanerConfig; import kafka.log.CleanerConfig;
@ -28,10 +26,11 @@ import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats; import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel; import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory; import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.ZkMetadataCache; import kafka.server.ZkMetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.ConfigRepository; import kafka.server.metadata.ConfigRepository;
import kafka.server.metadata.MockConfigRepository; import kafka.server.metadata.MockConfigRepository;
@ -62,13 +61,13 @@ import org.openjdk.jmh.annotations.Warmup;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.collection.JavaConverters;
import scala.Option; import scala.Option;
import scala.collection.JavaConverters;
@Warmup(iterations = 5) @Warmup(iterations = 5)
@Measurement(iterations = 5) @Measurement(iterations = 5)
@ -92,6 +91,7 @@ public class PartitionCreationBench {
private ReplicaManager replicaManager; private ReplicaManager replicaManager;
private QuotaFactory.QuotaManagers quotaManagers; private QuotaFactory.QuotaManagers quotaManagers;
private KafkaZkClient zkClient;
private LogDirFailureChannel failureChannel; private LogDirFailureChannel failureChannel;
private LogManager logManager; private LogManager logManager;
private AlterIsrManager alterIsrManager; private AlterIsrManager alterIsrManager;
@ -122,52 +122,47 @@ public class PartitionCreationBench {
Double.MAX_VALUE, 15 * 1000, true, "MD5"); Double.MAX_VALUE, 15 * 1000, true, "MD5");
ConfigRepository configRepository = new MockConfigRepository(); ConfigRepository configRepository = new MockConfigRepository();
this.logManager = new LogManager(JavaConverters.asScalaIteratorConverter(files.iterator()).asScala().toSeq(), this.logManager = new LogManagerBuilder().
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(), setLogDirs(files).
configRepository, setInitialOfflineDirs(Collections.emptyList()).
createLogConfig(), setConfigRepository(configRepository).
cleanerConfig, setInitialDefaultConfig(createLogConfig()).
1, setCleanerConfig(cleanerConfig).
1000L, setRecoveryThreadsPerDataDir(1).
10000L, setFlushCheckMs(1000L).
10000L, setFlushRecoveryOffsetCheckpointMs(10000L).
1000L, setFlushStartOffsetCheckpointMs(10000L).
60000, setRetentionCheckMs(1000L).
ApiVersion.latestVersion(), setMaxPidExpirationMs(60000).
scheduler, setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
brokerTopicStats, setScheduler(scheduler).
failureChannel, setBrokerTopicStats(brokerTopicStats).
Time.SYSTEM, setLogDirFailureChannel(failureChannel).
true); setTime(Time.SYSTEM).
setKeepPartitionMetadataFile(true).
build();
scheduler.startup(); scheduler.startup();
final MetadataCache metadataCache = this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, "");
new ZkMetadataCache(this.brokerProperties.brokerId()); this.zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
this.time, "");
KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
@Override @Override
public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) { public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
return new Properties(); return new Properties();
} }
}; };
this.alterIsrManager = TestUtils.createAlterIsrManager(); this.alterIsrManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManager( this.replicaManager = new ReplicaManagerBuilder().
this.brokerProperties, setConfig(brokerProperties).
this.metrics, setMetrics(metrics).
this.time, setTime(time).
Option.apply(zkClient), setZkClient(zkClient).
this.scheduler, setScheduler(scheduler).
this.logManager, setLogManager(logManager).
new AtomicBoolean(false), setQuotaManagers(quotaManagers).
this.quotaManagers, setBrokerTopicStats(brokerTopicStats).
brokerTopicStats, setMetadataCache(new ZkMetadataCache(this.brokerProperties.brokerId())).
metadataCache, setLogDirFailureChannel(failureChannel).
this.failureChannel, setAlterIsrManager(alterIsrManager).
alterIsrManager, build();
Option.empty());
replicaManager.startup(); replicaManager.startup();
replicaManager.checkpointHighWatermarks(); replicaManager.checkpointHighWatermarks();
} }
@ -182,6 +177,7 @@ public class PartitionCreationBench {
for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) {
Utils.delete(dir); Utils.delete(dir);
} }
this.zkClient.close();
} }
private static LogConfig createLogConfig() { private static LogConfig createLogConfig() {