MINOR: Add ConfigRepository, use in Partition and KafkaApis (#10005)

`Partition` objects are able to retrieve topic configs when creating their log, and currently they do so with an implementation of `trait TopicConfigFetcher` that uses ZooKeeper.  ZooKeeper is not available when using a Raft-based metadata log, so we need to abstract the retrieval of configs so it can work either with or without ZooKeeper.  This PR introduces `trait ConfigRepository` with `ZkConfigRepository` and `CachedConfigRepository` implementations.  `Partition` objects now use a provided `ConfigRepository` to retrieve topic configs, and we eliminate `TopicConfigFetcher` as it is no longer needed.

`ReplicaManager` now contains an instance of `ConfigRepository` so it can provide it when creating `Partition` instances.

`KafkaApis` needs to be able to handle describe-config requests; it currently delegates that to `ZkAdminManager`, which of course queries ZooKeeper.  To make this work with or without ZooKeeper we move the logic from `ZkAdminManager` into a new `ConfigHelper` class that goes through a `ConfigRepository` instance.  We provide `KafkaApis` with such an instance, and it creates an instance of the helper so it can use that instead of going through `ZkAdminManager`.

Existing tests are sufficient to identify bugs and regressions in `Partition`, `ReplicaManager`, `KafkaApis`, and `ConfigHelper`.  The `ConfigRepository` implementations have their own unit tests.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Ron Dagostino 2021-02-04 15:58:26 -05:00 committed by GitHub
parent ca29727d2a
commit c4ea6fb0a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 736 additions and 357 deletions

View File

@ -17,7 +17,7 @@
package kafka.cluster package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.{Optional, Properties} import java.util.Optional
import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException import kafka.common.UnexpectedAppendOffsetException
@ -26,9 +26,9 @@ import kafka.log._
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server._ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.metadata.ConfigRepository
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._ import kafka.utils._
import kafka.zk.AdminZkClient
import kafka.zookeeper.ZooKeeperClientException import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData} import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
@ -51,10 +51,6 @@ trait IsrChangeListener {
def markFailed(): Unit def markFailed(): Unit
} }
trait TopicConfigFetcher {
def fetch(): Properties
}
class DelayedOperations(topicPartition: TopicPartition, class DelayedOperations(topicPartition: TopicPartition,
produce: DelayedOperationPurgatory[DelayedProduce], produce: DelayedOperationPurgatory[DelayedProduce],
fetch: DelayedOperationPurgatory[DelayedFetch], fetch: DelayedOperationPurgatory[DelayedFetch],
@ -73,6 +69,7 @@ class DelayedOperations(topicPartition: TopicPartition,
object Partition extends KafkaMetricsGroup { object Partition extends KafkaMetricsGroup {
def apply(topicPartition: TopicPartition, def apply(topicPartition: TopicPartition,
time: Time, time: Time,
configRepository: ConfigRepository,
replicaManager: ReplicaManager): Partition = { replicaManager: ReplicaManager): Partition = {
val isrChangeListener = new IsrChangeListener { val isrChangeListener = new IsrChangeListener {
@ -87,13 +84,6 @@ object Partition extends KafkaMetricsGroup {
override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark() override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark()
} }
val configProvider = new TopicConfigFetcher {
override def fetch(): Properties = {
val adminZkClient = new AdminZkClient(replicaManager.zkClient)
adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
}
}
val delayedOperations = new DelayedOperations( val delayedOperations = new DelayedOperations(
topicPartition, topicPartition,
replicaManager.delayedProducePurgatory, replicaManager.delayedProducePurgatory,
@ -105,7 +95,7 @@ object Partition extends KafkaMetricsGroup {
interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
localBrokerId = replicaManager.config.brokerId, localBrokerId = replicaManager.config.brokerId,
time = time, time = time,
topicConfigProvider = configProvider, configRepository = configRepository,
isrChangeListener = isrChangeListener, isrChangeListener = isrChangeListener,
delayedOperations = delayedOperations, delayedOperations = delayedOperations,
metadataCache = replicaManager.metadataCache, metadataCache = replicaManager.metadataCache,
@ -228,7 +218,7 @@ class Partition(val topicPartition: TopicPartition,
interBrokerProtocolVersion: ApiVersion, interBrokerProtocolVersion: ApiVersion,
localBrokerId: Int, localBrokerId: Int,
time: Time, time: Time,
topicConfigProvider: TopicConfigFetcher, configRepository: ConfigRepository,
isrChangeListener: IsrChangeListener, isrChangeListener: IsrChangeListener,
delayedOperations: DelayedOperations, delayedOperations: DelayedOperations,
metadataCache: MetadataCache, metadataCache: MetadataCache,
@ -342,7 +332,7 @@ class Partition(val topicPartition: TopicPartition,
// Visible for testing // Visible for testing
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
def fetchLogConfig: LogConfig = { def fetchLogConfig: LogConfig = {
val props = topicConfigProvider.fetch() val props = configRepository.topicConfig(topic)
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
} }

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.{Collections, Properties}
import kafka.log.LogConfig
import kafka.server.metadata.ConfigRepository
import kafka.utils.{Log4jController, Logging}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
import org.apache.kafka.common.message.DescribeConfigsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._
class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging {
def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
includeSynonyms: Boolean,
includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {
resourceToConfigNames.map { case resource =>
def allConfigs(config: AbstractConfig) = {
config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala
}
def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = {
val filteredConfigPairs = if (resource.configurationKeys == null)
configs.toBuffer
else
configs.filter { case (configName, _) =>
resource.configurationKeys.asScala.forall(_.contains(configName))
}.toBuffer
val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
.setConfigs(configEntries.asJava)
}
try {
val configResult = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.TOPIC =>
val topic = resource.resourceName
Topic.validate(topic)
if (metadataCache.contains(topic)) {
val topicProps = configRepository.topicConfig(topic)
val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(config), topicProps)
createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
} else {
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
}
case ConfigResource.Type.BROKER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId)
createResponseConfig(allConfigs(config),
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}")
case ConfigResource.Type.BROKER_LOGGER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
throw new InvalidRequestException("Broker id must not be empty")
else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
else
createResponseConfig(Log4jController.loggers,
(name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType)
} catch {
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing describe configs request for resource $resource"
if (e.isInstanceOf[ApiException])
info(message, e)
else
error(message, e)
val err = ApiError.fromThrowable(e)
new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(resource.resourceName)
.setResourceType(resource.resourceType)
.setErrorMessage(err.message)
.setErrorCode(err.error.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
}
}
}
def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = LogConfig.configType(name)
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
val list = LogConfig.TopicConfigSynonyms.get(name)
.map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
.getOrElse(List.empty)
if (!topicProps.containsKey(name))
list
else
new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.TOPIC_CONFIG.id) +: list
}
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) logConfig.documentationOf(name) else null
new DescribeConfigsResponseData.DescribeConfigsResourceResult()
.setName(name).setValue(valueAsString).setConfigSource(source)
.setIsSensitive(isSensitive).setReadOnly(false).setSynonyms(synonyms.asJava)
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}
private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = KafkaConfig.configType(name)
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive)
null
else value match {
case v: String => v
case _ => ConfigDef.convertToString(value, configEntryType.orNull)
}
val allSynonyms = configSynonyms(name, allNames, isSensitive)
.filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name).setValue(valueAsString).setConfigSource(source)
.setIsSensitive(isSensitive).setReadOnly(readOnly).setSynonyms(synonyms.asJava)
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}
private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponseData.DescribeConfigsSynonym] = {
val dynamicConfig = config.dynamicConfig
val allSynonyms = mutable.Buffer[DescribeConfigsResponseData.DescribeConfigsSynonym]()
def maybeAddSynonym(map: Map[String, String], source: ConfigSource)(name: String): Unit = {
map.get(name).map { value =>
val configValue = if (isSensitive) null else value
allSynonyms += new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(configValue).setSource(source.id)
}
}
synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicBrokerConfigs, ConfigSource.DYNAMIC_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicDefaultConfigs, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.staticBrokerConfigs, ConfigSource.STATIC_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.staticDefaultConfigs, ConfigSource.DEFAULT_CONFIG))
allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config
}
private def brokerSynonyms(name: String): List[String] = {
DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
}
private def brokerDocumentation(name: String): String = {
config.documentationOf(name)
}
private def configResponseType(configType: Option[ConfigDef.Type]): DescribeConfigsResponse.ConfigType = {
if (configType.isEmpty)
DescribeConfigsResponse.ConfigType.UNKNOWN
else configType.get match {
case ConfigDef.Type.BOOLEAN => DescribeConfigsResponse.ConfigType.BOOLEAN
case ConfigDef.Type.STRING => DescribeConfigsResponse.ConfigType.STRING
case ConfigDef.Type.INT => DescribeConfigsResponse.ConfigType.INT
case ConfigDef.Type.SHORT => DescribeConfigsResponse.ConfigType.SHORT
case ConfigDef.Type.LONG => DescribeConfigsResponse.ConfigType.LONG
case ConfigDef.Type.DOUBLE => DescribeConfigsResponse.ConfigType.DOUBLE
case ConfigDef.Type.LIST => DescribeConfigsResponse.ConfigType.LIST
case ConfigDef.Type.CLASS => DescribeConfigsResponse.ConfigType.CLASS
case ConfigDef.Type.PASSWORD => DescribeConfigsResponse.ConfigType.PASSWORD
case _ => DescribeConfigsResponse.ConfigType.UNKNOWN
}
}
private def resourceNameToBrokerId(resourceName: String): Int = {
try resourceName.toInt catch {
case _: NumberFormatException =>
throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName")
}
}
}

View File

@ -89,6 +89,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import kafka.coordinator.group.GroupOverview import kafka.coordinator.group.GroupOverview
import kafka.server.metadata.ConfigRepository
import scala.annotation.nowarn import scala.annotation.nowarn
@ -105,6 +106,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val zkClient: KafkaZkClient, val zkClient: KafkaZkClient,
val brokerId: Int, val brokerId: Int,
val config: KafkaConfig, val config: KafkaConfig,
val configRepository: ConfigRepository,
val metadataCache: MetadataCache, val metadataCache: MetadataCache,
val metrics: Metrics, val metrics: Metrics,
val authorizer: Option[Authorizer], val authorizer: Option[Authorizer],
@ -120,6 +122,7 @@ class KafkaApis(val requestChannel: RequestChannel,
type FetchResponseStats = Map[TopicPartition, RecordConversionStats] type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
this.logIdent = "[KafkaApi-%d] ".format(brokerId) this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val adminZkClient = new AdminZkClient(zkClient) val adminZkClient = new AdminZkClient(zkClient)
val configHelper = new ConfigHelper(metadataCache, config, configRepository)
private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
val authHelper = new AuthHelper(authorizer) val authHelper = new AuthHelper(authorizer)
@ -2767,7 +2770,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}") case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
} }
} }
val authorizedConfigs = adminManager.describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation) val authorizedConfigs = configHelper.describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
val unauthorizedConfigs = unauthorizedResources.map { resource => val unauthorizedConfigs = unauthorizedResources.map { resource =>
val error = ConfigResource.Type.forId(resource.resourceType) match { val error = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED

View File

@ -32,8 +32,9 @@ import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.security.CredentialProvider import kafka.security.CredentialProvider
import kafka.server.metadata.ZkConfigRepository
import kafka.utils._ import kafka.utils._
import kafka.zk.{BrokerInfo, KafkaZkClient} import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
@ -139,6 +140,7 @@ class KafkaServer(
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig()) val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
private var _zkClient: KafkaZkClient = null private var _zkClient: KafkaZkClient = null
private var configRepository: ZkConfigRepository = null
val correlationId: AtomicInteger = new AtomicInteger(0) val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties" val brokerMetaPropsFile = "meta.properties"
@ -184,6 +186,7 @@ class KafkaServer(
/* setup zookeeper */ /* setup zookeeper */
initZkClient(time) initZkClient(time)
configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
/* initialize features */ /* initialize features */
_featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient) _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
@ -334,7 +337,7 @@ class KafkaServer(
/* start processing requests */ /* start processing requests */
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, forwardingManager, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, kafkaController, forwardingManager, zkClient, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
@ -342,7 +345,7 @@ class KafkaServer(
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, forwardingManager, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, kafkaController, forwardingManager, zkClient, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
@ -385,8 +388,8 @@ class KafkaServer(
} }
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers, new ReplicaManager(config, metrics, time, Some(zkClient), kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager) brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, configRepository)
} }
private def initZkClient(time: Time): Unit = { private def initZkClient(time: Time): Unit = {

View File

@ -33,6 +33,7 @@ import kafka.server.{FetchMetadata => SFetchMetadata}
import kafka.server.HostedPartition.Online import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.server.metadata.ConfigRepository
import kafka.utils._ import kafka.utils._
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
@ -199,7 +200,7 @@ object ReplicaManager {
class ReplicaManager(val config: KafkaConfig, class ReplicaManager(val config: KafkaConfig,
metrics: Metrics, metrics: Metrics,
time: Time, time: Time,
val zkClient: KafkaZkClient, val zkClient: Option[KafkaZkClient],
scheduler: Scheduler, scheduler: Scheduler,
val logManager: LogManager, val logManager: LogManager,
val isShuttingDown: AtomicBoolean, val isShuttingDown: AtomicBoolean,
@ -212,12 +213,13 @@ class ReplicaManager(val config: KafkaConfig,
val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader], val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
threadNamePrefix: Option[String], threadNamePrefix: Option[String],
configRepository: ConfigRepository,
val alterIsrManager: AlterIsrManager) extends Logging with KafkaMetricsGroup { val alterIsrManager: AlterIsrManager) extends Logging with KafkaMetricsGroup {
def this(config: KafkaConfig, def this(config: KafkaConfig,
metrics: Metrics, metrics: Metrics,
time: Time, time: Time,
zkClient: KafkaZkClient, zkClient: Option[KafkaZkClient],
scheduler: Scheduler, scheduler: Scheduler,
logManager: LogManager, logManager: LogManager,
isShuttingDown: AtomicBoolean, isShuttingDown: AtomicBoolean,
@ -226,6 +228,7 @@ class ReplicaManager(val config: KafkaConfig,
metadataCache: MetadataCache, metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel, logDirFailureChannel: LogDirFailureChannel,
alterIsrManager: AlterIsrManager, alterIsrManager: AlterIsrManager,
configRepository: ConfigRepository,
threadNamePrefix: Option[String] = None) = { threadNamePrefix: Option[String] = None) = {
this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown, this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
@ -240,14 +243,14 @@ class ReplicaManager(val config: KafkaConfig,
purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests), purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedElectLeader]( DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", brokerId = config.brokerId), purgatoryName = "ElectLeader", brokerId = config.brokerId),
threadNamePrefix, alterIsrManager) threadNamePrefix, configRepository, alterIsrManager)
} }
/* epoch of the controller that last changed the leader */ /* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
private val localBrokerId = config.brokerId private val localBrokerId = config.brokerId
private val allPartitions = new Pool[TopicPartition, HostedPartition]( private val allPartitions = new Pool[TopicPartition, HostedPartition](
valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, this))) valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, configRepository, this)))
) )
private val replicaStateChangeLock = new Object private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower) val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
@ -514,7 +517,7 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing // Visible for testing
def createPartition(topicPartition: TopicPartition): Partition = { def createPartition(topicPartition: TopicPartition): Partition = {
val partition = Partition(topicPartition, time, this) val partition = Partition(topicPartition, time, configRepository, this)
allPartitions.put(topicPartition, HostedPartition.Online(partition)) allPartitions.put(topicPartition, HostedPartition.Online(partition))
partition partition
} }
@ -1369,7 +1372,7 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition) Some(partition)
case HostedPartition.None => case HostedPartition.None =>
val partition = Partition(topicPartition, time, this) val partition = Partition(topicPartition, time, configRepository, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition)) allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
Some(partition) Some(partition)
} }
@ -1881,7 +1884,11 @@ class ReplicaManager(val config: KafkaConfig,
logManager.handleLogDirFailure(dir) logManager.handleLogDirFailure(dir)
if (sendZkNotification) if (sendZkNotification)
zkClient.propagateLogDirEvent(localBrokerId) if (zkClient.isEmpty) {
warn("Unable to propagate log dir failure via Zookeeper in KIP-500 mode") // will be handled via KIP-589
} else {
zkClient.get.propagateLogDirEvent(localBrokerId)
}
warn(s"Stopped serving replicas in dir $dir") warn(s"Stopped serving replicas in dir $dir")
} }

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import java.util import java.util
import java.util.{Collections, Properties} import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils} import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.common.TopicAlreadyMarkedForDeletionException
@ -25,6 +25,7 @@ import kafka.log.LogConfig
import kafka.utils.Log4jController import kafka.utils.Log4jController
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.DynamicConfig.QuotaConfigs import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.metadata.ZkConfigRepository
import kafka.utils._ import kafka.utils._
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient} import kafka.zk.{AdminZkClient, KafkaZkClient}
@ -32,16 +33,14 @@ import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig} import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException} import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicConfigs, CreatableTopicResult} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicConfigs, CreatableTopicResult}
import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, AlterUserScramCredentialsResponseData, DescribeConfigsResponseData, DescribeUserScramCredentialsResponseData} import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, AlterUserScramCredentialsResponseData, DescribeUserScramCredentialsResponseData}
import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.{ScramMechanism => InternalScramMechanism} import org.apache.kafka.common.security.scram.internals.{ScramMechanism => InternalScramMechanism}
@ -50,8 +49,7 @@ import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.CreateTopicsRequest._ import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.common.utils.Sanitizer
@ -67,6 +65,7 @@ class ZkAdminManager(val config: KafkaConfig,
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId) private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val adminZkClient = new AdminZkClient(zkClient) private val adminZkClient = new AdminZkClient(zkClient)
private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient))
private val createTopicPolicy = private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy])) Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
@ -118,7 +117,7 @@ class ZkAdminManager(val config: KafkaConfig,
assignments: Map[Int, Seq[Int]]): Unit = { assignments: Map[Int, Seq[Int]]): Unit = {
metadataAndConfigs.get(topicName).foreach { result => metadataAndConfigs.get(topicName).foreach { result =>
val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(config), configs) val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(config), configs)
val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _) val createEntry = configHelper.createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _)
val topicConfigs = logConfig.values.asScala.map { case (k, v) => val topicConfigs = logConfig.values.asScala.map { case (k, v) =>
val entry = createEntry(k, v) val entry = createEntry(k, v)
new CreatableTopicConfigs() new CreatableTopicConfigs()
@ -389,85 +388,6 @@ class ZkAdminManager(val config: KafkaConfig,
} }
} }
def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
includeSynonyms: Boolean,
includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {
resourceToConfigNames.map { case resource =>
def allConfigs(config: AbstractConfig) = {
config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala
}
def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = {
val filteredConfigPairs = if (resource.configurationKeys == null)
configs.toBuffer
else
configs.filter { case (configName, _) =>
resource.configurationKeys.asScala.forall(_.contains(configName))
}.toBuffer
val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
.setConfigs(configEntries.asJava)
}
try {
val configResult = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.TOPIC =>
val topic = resource.resourceName
Topic.validate(topic)
if (metadataCache.contains(topic)) {
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(config), topicProps)
createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
} else {
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
}
case ConfigResource.Type.BROKER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId)
createResponseConfig(allConfigs(config),
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}")
case ConfigResource.Type.BROKER_LOGGER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
throw new InvalidRequestException("Broker id must not be empty")
else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
else
createResponseConfig(Log4jController.loggers,
(name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType)
} catch {
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing describe configs request for resource $resource"
if (e.isInstanceOf[ApiException])
info(message, e)
else
error(message, e)
val err = ApiError.fromThrowable(e)
new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(resource.resourceName)
.setResourceType(resource.resourceType)
.setErrorMessage(err.message)
.setErrorCode(err.error.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
}
}.toList
}
def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = { def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, config) => configs.map { case (resource, config) =>
@ -728,98 +648,6 @@ class ZkAdminManager(val config: KafkaConfig,
} }
} }
private def brokerSynonyms(name: String): List[String] = {
DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
}
private def brokerDocumentation(name: String): String = {
config.documentationOf(name)
}
private def configResponseType(configType: Option[ConfigDef.Type]): DescribeConfigsResponse.ConfigType = {
if (configType.isEmpty)
DescribeConfigsResponse.ConfigType.UNKNOWN
else configType.get match {
case ConfigDef.Type.BOOLEAN => DescribeConfigsResponse.ConfigType.BOOLEAN
case ConfigDef.Type.STRING => DescribeConfigsResponse.ConfigType.STRING
case ConfigDef.Type.INT => DescribeConfigsResponse.ConfigType.INT
case ConfigDef.Type.SHORT => DescribeConfigsResponse.ConfigType.SHORT
case ConfigDef.Type.LONG => DescribeConfigsResponse.ConfigType.LONG
case ConfigDef.Type.DOUBLE => DescribeConfigsResponse.ConfigType.DOUBLE
case ConfigDef.Type.LIST => DescribeConfigsResponse.ConfigType.LIST
case ConfigDef.Type.CLASS => DescribeConfigsResponse.ConfigType.CLASS
case ConfigDef.Type.PASSWORD => DescribeConfigsResponse.ConfigType.PASSWORD
case _ => DescribeConfigsResponse.ConfigType.UNKNOWN
}
}
private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponseData.DescribeConfigsSynonym] = {
val dynamicConfig = config.dynamicConfig
val allSynonyms = mutable.Buffer[DescribeConfigsResponseData.DescribeConfigsSynonym]()
def maybeAddSynonym(map: Map[String, String], source: ConfigSource)(name: String): Unit = {
map.get(name).map { value =>
val configValue = if (isSensitive) null else value
allSynonyms += new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(configValue).setSource(source.id)
}
}
synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicBrokerConfigs, ConfigSource.DYNAMIC_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicDefaultConfigs, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.staticBrokerConfigs, ConfigSource.STATIC_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.staticDefaultConfigs, ConfigSource.DEFAULT_CONFIG))
allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config
}
private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = LogConfig.configType(name)
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
val list = LogConfig.TopicConfigSynonyms.get(name)
.map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
.getOrElse(List.empty)
if (!topicProps.containsKey(name))
list
else
new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.TOPIC_CONFIG.id) +: list
}
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) logConfig.documentationOf(name) else null
new DescribeConfigsResponseData.DescribeConfigsResourceResult()
.setName(name).setValue(valueAsString).setConfigSource(source)
.setIsSensitive(isSensitive).setReadOnly(false).setSynonyms(synonyms.asJava)
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}
private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = KafkaConfig.configType(name)
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive)
null
else value match {
case v: String => v
case _ => ConfigDef.convertToString(value, configEntryType.orNull)
}
val allSynonyms = configSynonyms(name, allNames, isSensitive)
.filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name).setValue(valueAsString).setConfigSource(source)
.setIsSensitive(isSensitive).setReadOnly(readOnly).setSynonyms(synonyms.asJava)
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}
private def sanitizeEntityName(entityName: String): String = private def sanitizeEntityName(entityName: String): String =
Option(entityName) match { Option(entityName) match {
case None => ConfigEntityName.Default case None => ConfigEntityName.Default

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.function.BiFunction
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import scala.jdk.CollectionConverters._
/**
* A ConfigRepository that stores configurations locally.
*/
class CachedConfigRepository extends ConfigRepository {
val configMap = new ConcurrentHashMap[ConfigResource, util.HashMap[String, String]]
/**
* Set the topic config for the given topic name and the given key to the given value.
*
* @param topicName the name of the topic for which the config will be set
* @param key the key identifying the topic config to set
* @param value the value to set for the topic config with null implying a removal
*/
def setTopicConfig(topicName: String, key: String, value: String): Unit = {
setConfig(new ConfigResource(Type.TOPIC, topicName), key, value)
}
/**
* Set the broker config for the given broker ID and the given key to the given value.
*
* @param brokerId the ID of the broker for which the config will be set
* @param key the key identifying the broker config to set
* @param value the value to set for the broker config with null implying a removal
*/
def setBrokerConfig(brokerId: Int, key: String, value: String): Unit = {
setConfig(new ConfigResource(Type.BROKER, brokerId.toString()), key, value)
}
/**
* Set the config for the given resource and the given key to the given value.
*
* @param configResource the resource for which the config will be set
* @param key the key identifying the resource config to set
* @param value the value to set for the resource config with null implying a removal
*/
def setConfig(configResource: ConfigResource, key: String, value: String): Unit = {
configMap.compute(configResource, new BiFunction[ConfigResource, util.HashMap[String, String], util.HashMap[String, String]] {
override def apply(resource: ConfigResource,
curConfig: util.HashMap[String, String]): util.HashMap[String, String] = {
if (value == null) {
if (curConfig == null) {
null
} else {
val newConfig = new util.HashMap[String, String](curConfig)
newConfig.remove(key)
if (newConfig.isEmpty) {
null
} else {
newConfig
}
}
} else {
if (curConfig == null) {
val newConfig = new util.HashMap[String, String](1)
newConfig.put(key, value)
newConfig
} else {
val newConfig = new util.HashMap[String, String](curConfig.size() + 1)
newConfig.putAll(curConfig)
newConfig.put(key, value)
newConfig
}
}
}
})
}
override def config(configResource: ConfigResource): Properties = {
val properties = new Properties()
Option(configMap.get(configResource)).foreach {
_.entrySet().iterator().asScala.foreach { case e =>
properties.put(e.getKey, e.getValue)
}
}
properties
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
trait ConfigRepository {
/**
* Return a copy of the topic configuration for the given topic. Future changes will not be reflected.
*
* @param topicName the name of the topic for which the configuration will be returned
* @return a copy of the topic configuration for the given topic
*/
def topicConfig(topicName: String): Properties = {
config(new ConfigResource(Type.TOPIC, topicName))
}
/**
* Return a copy of the broker configuration for the given broker. Future changes will not be reflected.
*
* @param brokerId the id of the broker for which configuration will be returned
* @return a copy of the broker configuration for the given broker
*/
def brokerConfig(brokerId: Int): Properties = {
config(new ConfigResource(Type.BROKER, brokerId.toString))
}
/**
* Return a copy of the configuration for the given resource. Future changes will not be reflected.
* @param configResource the resource for which the configuration will be returned
* @return a copy of the configuration for the given resource
*/
def config(configResource: ConfigResource): Properties
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util.Properties
import kafka.server.ConfigType
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
object ZkConfigRepository {
def apply(zkClient: KafkaZkClient): ZkConfigRepository =
new ZkConfigRepository(new AdminZkClient(zkClient))
}
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {
override def config(configResource: ConfigResource): Properties = {
val configResourceType = configResource.`type`()
val configTypeForZk = if (configResourceType == Type.TOPIC) {
ConfigType.Topic
} else if (configResourceType == Type.BROKER) {
ConfigType.Broker
} else {
throw new IllegalArgumentException(s"Unsupported config type: $configResourceType")
}
adminZkClient.fetchEntityConfig(configTypeForZk, configResource.name())
}
}

View File

@ -23,6 +23,7 @@ import kafka.api.ApiVersion
import kafka.log.{CleanerConfig, LogConfig, LogManager} import kafka.log.{CleanerConfig, LogConfig, LogManager}
import kafka.server.{Defaults, MetadataCache} import kafka.server.{Defaults, MetadataCache}
import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.metadata.ConfigRepository
import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener} import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener}
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -43,7 +44,7 @@ class AbstractPartitionTest {
var alterIsrManager: MockAlterIsrManager = _ var alterIsrManager: MockAlterIsrManager = _
var isrChangeListener: MockIsrChangeListener = _ var isrChangeListener: MockIsrChangeListener = _
var logConfig: LogConfig = _ var logConfig: LogConfig = _
var topicConfigProvider: TopicConfigFetcher = _ var configRepository: ConfigRepository = _
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache]) val metadataCache: MetadataCache = mock(classOf[MetadataCache])
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@ -55,7 +56,7 @@ class AbstractPartitionTest {
val logProps = createLogProperties(Map.empty) val logProps = createLogProperties(Map.empty)
logConfig = LogConfig(logProps) logConfig = LogConfig(logProps)
topicConfigProvider = TestUtils.createTopicConfigProvider(logProps) configRepository = TestUtils.createConfigRepository(topicPartition.topic(), logProps)
tmpDir = TestUtils.tempDir() tmpDir = TestUtils.tempDir()
logDir1 = TestUtils.randomPartitionLogDir(tmpDir) logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
@ -71,7 +72,7 @@ class AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion, interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId, localBrokerId = brokerId,
time, time,
topicConfigProvider, configRepository,
isrChangeListener, isrChangeListener,
delayedOperations, delayedOperations,
metadataCache, metadataCache,

View File

@ -248,7 +248,7 @@ class PartitionLockTest extends Logging {
val leaderEpoch = 1 val leaderEpoch = 1
val brokerId = 0 val brokerId = 0
val topicPartition = new TopicPartition("test-topic", 0) val topicPartition = new TopicPartition("test-topic", 0)
val topicConfigProvider = TestUtils.createTopicConfigProvider(createLogProperties(Map.empty)) val topicConfigProvider = TestUtils.createConfigRepository(topicPartition.topic(), createLogProperties(Map.empty))
val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener]) val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache]) val metadataCache: MetadataCache = mock(classOf[MetadataCache])

View File

@ -224,7 +224,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion, interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId, localBrokerId = brokerId,
time, time,
topicConfigProvider, configRepository,
isrChangeListener, isrChangeListener,
delayedOperations, delayedOperations,
metadataCache, metadataCache,
@ -1569,7 +1569,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = KAFKA_2_6_IV0, // shouldn't matter, but set this to a ZK isr version interBrokerProtocolVersion = KAFKA_2_6_IV0, // shouldn't matter, but set this to a ZK isr version
localBrokerId = brokerId, localBrokerId = brokerId,
time, time,
topicConfigProvider, configRepository,
isrChangeListener, isrChangeListener,
delayedOperations, delayedOperations,
metadataCache, metadataCache,
@ -1689,7 +1689,7 @@ class PartitionTest extends AbstractPartitionTest {
val topicPartition = new TopicPartition("test", 1) val topicPartition = new TopicPartition("test", 1)
val partition = new Partition( val partition = new Partition(
topicPartition, 1000, ApiVersion.latestVersion, 0, topicPartition, 1000, ApiVersion.latestVersion, 0,
new SystemTime(), topicConfigProvider, mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]), new SystemTime(), configRepository, mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterIsrManager])) mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterIsrManager]))
val replicas = Seq(0, 1, 2, 3) val replicas = Seq(0, 1, 2, 3)
@ -1724,13 +1724,13 @@ class PartitionTest extends AbstractPartitionTest {
@Test @Test
def testLogConfigNotDirty(): Unit = { def testLogConfigNotDirty(): Unit = {
val spyLogManager = spy(logManager) val spyLogManager = spy(logManager)
val spyConfigProvider = spy(topicConfigProvider) val spyConfigRepository = spy(configRepository)
val partition = new Partition(topicPartition, val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion, interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId, localBrokerId = brokerId,
time, time,
spyConfigProvider, spyConfigRepository,
isrChangeListener, isrChangeListener,
delayedOperations, delayedOperations,
metadataCache, metadataCache,
@ -1745,8 +1745,8 @@ class PartitionTest extends AbstractPartitionTest {
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
// We should get config from ZK only once // We should retrieve configs only once
verify(spyConfigProvider, times(1)).fetch() verify(spyConfigRepository, times(1)).topicConfig(topicPartition.topic())
} }
/** /**
@ -1755,7 +1755,7 @@ class PartitionTest extends AbstractPartitionTest {
*/ */
@Test @Test
def testLogConfigDirtyAsTopicUpdated(): Unit = { def testLogConfigDirtyAsTopicUpdated(): Unit = {
val spyConfigProvider = spy(topicConfigProvider) val spyConfigRepository = spy(configRepository)
val spyLogManager = spy(logManager) val spyLogManager = spy(logManager)
doAnswer((invocation: InvocationOnMock) => { doAnswer((invocation: InvocationOnMock) => {
logManager.initializingLog(topicPartition) logManager.initializingLog(topicPartition)
@ -1767,7 +1767,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion, interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId, localBrokerId = brokerId,
time, time,
spyConfigProvider, spyConfigRepository,
isrChangeListener, isrChangeListener,
delayedOperations, delayedOperations,
metadataCache, metadataCache,
@ -1782,9 +1782,9 @@ class PartitionTest extends AbstractPartitionTest {
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
// We should get config from ZK twice, once before log is created, and second time once // We should retrieve configs twice, once before log is created, and second time once
// we find log config is dirty and refresh it. // we find log config is dirty and refresh it.
verify(spyConfigProvider, times(2)).fetch() verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
} }
/** /**
@ -1793,7 +1793,7 @@ class PartitionTest extends AbstractPartitionTest {
*/ */
@Test @Test
def testLogConfigDirtyAsBrokerUpdated(): Unit = { def testLogConfigDirtyAsBrokerUpdated(): Unit = {
val spyConfigProvider = spy(topicConfigProvider) val spyConfigRepository = spy(configRepository)
val spyLogManager = spy(logManager) val spyLogManager = spy(logManager)
doAnswer((invocation: InvocationOnMock) => { doAnswer((invocation: InvocationOnMock) => {
logManager.initializingLog(topicPartition) logManager.initializingLog(topicPartition)
@ -1805,7 +1805,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion, interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId, localBrokerId = brokerId,
time, time,
spyConfigProvider, spyConfigRepository,
isrChangeListener, isrChangeListener,
delayedOperations, delayedOperations,
metadataCache, metadataCache,
@ -1820,9 +1820,9 @@ class PartitionTest extends AbstractPartitionTest {
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
// We should get config from ZK twice, once before log is created, and second time once // We should get configs twice, once before log is created, and second time once
// we find log config is dirty and refresh it. // we find log config is dirty and refresh it.
verify(spyConfigProvider, times(2)).fetch() verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
} }
private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = { private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = {

View File

@ -158,7 +158,7 @@ object AbstractCoordinatorConcurrencyTest {
} }
class TestReplicaManager extends ReplicaManager( class TestReplicaManager extends ReplicaManager(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None, null) { null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None, null, null) {
var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _ var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
var watchKeys: mutable.Set[TopicPartitionOperationKey] = _ var watchKeys: mutable.Set[TopicPartitionOperationKey] = _

View File

@ -21,14 +21,13 @@ import java.io.File
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.easymock.EasyMock
import org.junit.jupiter.api._ import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.server.metadata.CachedConfigRepository
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.record.SimpleRecord
@ -36,7 +35,7 @@ class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo" val topic = "foo"
val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) val configRepository = new CachedConfigRepository()
val logManagers = configs map { config => val logManagers = configs map { config =>
TestUtils.createLogManager( TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)), logDirs = config.logDirs.map(new File(_)),
@ -57,9 +56,6 @@ class HighwatermarkPersistenceTest {
@Test @Test
def testHighWatermarkPersistenceSinglePartition(): Unit = { def testHighWatermarkPersistenceSinglePartition(): Unit = {
// mock zkclient
EasyMock.replay(zkClient)
// create kafka scheduler // create kafka scheduler
val scheduler = new KafkaScheduler(2) val scheduler = new KafkaScheduler(2)
scheduler.startup() scheduler.startup()
@ -67,9 +63,10 @@ class HighwatermarkPersistenceTest {
val time = new MockTime val time = new MockTime
val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
// create replica manager // create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, val replicaManager = new ReplicaManager(configs.head, metrics, time, None, scheduler,
logManagers.head, new AtomicBoolean(false), quotaManager, logManagers.head, new AtomicBoolean(false), quotaManager,
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager) new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager,
configRepository)
replicaManager.startup() replicaManager.startup()
try { try {
replicaManager.checkpointHighWatermarks() replicaManager.checkpointHighWatermarks()
@ -96,7 +93,6 @@ class HighwatermarkPersistenceTest {
replicaManager.checkpointHighWatermarks() replicaManager.checkpointHighWatermarks()
fooPartition0Hw = hwmFor(replicaManager, topic, 0) fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(log0.highWatermark, fooPartition0Hw) assertEquals(log0.highWatermark, fooPartition0Hw)
EasyMock.verify(zkClient)
} finally { } finally {
// shutdown the replica manager upon test completion // shutdown the replica manager upon test completion
replicaManager.shutdown(false) replicaManager.shutdown(false)
@ -110,8 +106,6 @@ class HighwatermarkPersistenceTest {
def testHighWatermarkPersistenceMultiplePartitions(): Unit = { def testHighWatermarkPersistenceMultiplePartitions(): Unit = {
val topic1 = "foo1" val topic1 = "foo1"
val topic2 = "foo2" val topic2 = "foo2"
// mock zkclient
EasyMock.replay(zkClient)
// create kafka scheduler // create kafka scheduler
val scheduler = new KafkaScheduler(2) val scheduler = new KafkaScheduler(2)
scheduler.startup() scheduler.startup()
@ -119,9 +113,9 @@ class HighwatermarkPersistenceTest {
val time = new MockTime val time = new MockTime
val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
// create replica manager // create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, val replicaManager = new ReplicaManager(configs.head, metrics, time, None,
scheduler, logManagers.head, new AtomicBoolean(false), quotaManager, scheduler, logManagers.head, new AtomicBoolean(false), quotaManager,
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager) new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager, configRepository)
replicaManager.startup() replicaManager.startup()
try { try {
replicaManager.checkpointHighWatermarks() replicaManager.checkpointHighWatermarks()
@ -168,7 +162,6 @@ class HighwatermarkPersistenceTest {
// verify checkpointed hw for topic 1 // verify checkpointed hw for topic 1
topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(10L, topic1Partition0Hw) assertEquals(10L, topic1Partition0Hw)
EasyMock.verify(zkClient)
} finally { } finally {
// shutdown the replica manager upon test completion // shutdown the replica manager upon test completion
replicaManager.shutdown(false) replicaManager.shutdown(false)

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.{Log, LogManager} import kafka.log.{Log, LogManager}
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.CachedConfigRepository
import kafka.utils.TestUtils.MockAlterIsrManager import kafka.utils.TestUtils.MockAlterIsrManager
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -65,9 +66,9 @@ class IsrExpirationTest {
alterIsrManager = TestUtils.createAlterIsrManager() alterIsrManager = TestUtils.createAlterIsrManager()
quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(configs.head, metrics, time, None, null, logManager, new AtomicBoolean(false),
quotaManager, new BrokerTopicStats, new MetadataCache(configs.head.brokerId), quotaManager, new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager) new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager, new CachedConfigRepository())
} }
@AfterEach @AfterEach

View File

@ -34,6 +34,7 @@ import kafka.log.AppendOrigin
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse} import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse}
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, CachedConfigRepository}
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.clients.NodeApiVersions
@ -45,6 +46,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic} import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
@ -118,7 +120,8 @@ class KafkaApisTest {
def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion, def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
authorizer: Option[Authorizer] = None, authorizer: Option[Authorizer] = None,
enableForwarding: Boolean = false): KafkaApis = { enableForwarding: Boolean = false,
configRepository: ConfigRepository = new CachedConfigRepository()): KafkaApis = {
val brokerFeatures = BrokerFeatures.createDefault() val brokerFeatures = BrokerFeatures.createDefault()
val cache = new FinalizedFeatureCache(brokerFeatures) val cache = new FinalizedFeatureCache(brokerFeatures)
val properties = TestUtils.createBrokerConfig(brokerId, "zk") val properties = TestUtils.createBrokerConfig(brokerId, "zk")
@ -140,6 +143,7 @@ class KafkaApisTest {
zkClient, zkClient,
brokerId, brokerId,
new KafkaConfig(properties), new KafkaConfig(properties),
configRepository,
metadataCache, metadataCache,
metrics, metrics,
authorizer, authorizer,
@ -173,30 +177,50 @@ class KafkaApisTest {
.andReturn(Seq(AuthorizationResult.ALLOWED).asJava) .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
.once() .once()
expectNoThrottling() val capturedResponse = expectNoThrottling()
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) val configRepository: ConfigRepository = EasyMock.strictMock(classOf[ConfigRepository])
EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true), EasyMock.eq(false))) val topicConfigs = new Properties()
.andReturn( val propName = "min.insync.replicas"
List(new DescribeConfigsResponseData.DescribeConfigsResult() val propValue = "3"
.setResourceName(configResource.name) topicConfigs.put(propName, propValue)
.setResourceType(configResource.`type`.id) EasyMock.expect(configRepository.topicConfig(resourceName)).andReturn(topicConfigs)
.setErrorCode(Errors.NONE.code)
.setConfigs(Collections.emptyList())))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer, metadataCache =
adminManager) EasyMock.partialMockBuilder(classOf[MetadataCache])
.withConstructor(classOf[Int])
.withArgs(Int.box(brokerId)) // Need to box it for Scala 2.12 and before
.addMockedMethod("contains", classOf[String])
.createMock()
val request = buildRequest(new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() expect(metadataCache.contains(resourceName)).andReturn(true)
EasyMock.replay(metadataCache, replicaManager, clientRequestQuotaManager, requestChannel, authorizer, configRepository, adminManager)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
.setIncludeSynonyms(true) .setIncludeSynonyms(true)
.setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource() .setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource()
.setResourceName("topic-1") .setResourceName(resourceName)
.setResourceType(ConfigResource.Type.TOPIC.id)).asJava)) .setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
.build(requestHeader.apiVersion), .build(requestHeader.apiVersion)
val request = buildRequest(describeConfigsRequest,
requestHeader = Option(requestHeader)) requestHeader = Option(requestHeader))
createKafkaApis(authorizer = Some(authorizer)).handleDescribeConfigsRequest(request) createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository).handleDescribeConfigsRequest(request)
verify(authorizer, adminManager) verify(authorizer, replicaManager)
val response = readResponse(describeConfigsRequest, capturedResponse)
.asInstanceOf[DescribeConfigsResponse]
val results = response.data().results()
assertEquals(1, results.size())
val describeConfigsResult: DescribeConfigsResult = results.get(0)
assertEquals(ConfigResource.Type.TOPIC.id, describeConfigsResult.resourceType())
assertEquals(resourceName, describeConfigsResult.resourceName())
val configs = describeConfigsResult.configs().asScala.filter(_.name() == propName)
assertEquals(1, configs.length)
val describeConfigsResponseData = configs(0)
assertEquals(propName, describeConfigsResponseData.name())
assertEquals(propValue, describeConfigsResponseData.value())
} }
@Test @Test
@ -2006,7 +2030,7 @@ class KafkaApisTest {
} }
def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val capturedResponse = expectNoThrottling() val capturedResponse = expectNoThrottling()
@ -2043,7 +2067,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(joinGroupRequest) val requestChannelRequest = buildRequest(joinGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest) createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
@ -2078,7 +2102,7 @@ class KafkaApisTest {
} }
def testJoinGroupProtocolType(version: Short): Unit = { def testJoinGroupProtocolType(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val capturedResponse = expectNoThrottling() val capturedResponse = expectNoThrottling()
@ -2116,7 +2140,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(joinGroupRequest) val requestChannelRequest = buildRequest(joinGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest) createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
@ -2159,7 +2183,7 @@ class KafkaApisTest {
} }
def testSyncGroupProtocolTypeAndName(version: Short): Unit = { def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val capturedResponse = expectNoThrottling() val capturedResponse = expectNoThrottling()
@ -2192,7 +2216,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(syncGroupRequest) val requestChannelRequest = buildRequest(syncGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleSyncGroupRequest(requestChannelRequest) createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
@ -2228,7 +2252,7 @@ class KafkaApisTest {
} }
def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): Unit = { def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val capturedResponse = expectNoThrottling() val capturedResponse = expectNoThrottling()
@ -2261,7 +2285,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(syncGroupRequest) val requestChannelRequest = buildRequest(syncGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel) EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleSyncGroupRequest(requestChannelRequest) createKafkaApis().handleSyncGroupRequest(requestChannelRequest)

View File

@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.{Log, LogManager, LogOffsetSnapshot} import kafka.log.{Log, LogManager, LogOffsetSnapshot}
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
@ -31,6 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.easymock.EasyMock import org.easymock.EasyMock
import EasyMock._ import EasyMock._
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.CachedConfigRepository
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
@ -199,7 +199,7 @@ class ReplicaManagerQuotasTest {
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record,
bothReplicasInSync: Boolean = false): Unit = { bothReplicasInSync: Boolean = false): Unit = {
val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) val configRepository = new CachedConfigRepository()
val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler]) val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
//Create log which handles both a regular read and a 0 bytes read //Create log which handles both a regular read and a 0 bytes read
@ -243,9 +243,9 @@ class ReplicaManagerQuotasTest {
val leaderBrokerId = configs.head.brokerId val leaderBrokerId = configs.head.brokerId
quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, logManager, replicaManager = new ReplicaManager(configs.head, metrics, time, None, scheduler, logManager,
new AtomicBoolean(false), quotaManager, new AtomicBoolean(false), quotaManager,
new BrokerTopicStats, new MetadataCache(leaderBrokerId), new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager) new BrokerTopicStats, new MetadataCache(leaderBrokerId), new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager, configRepository)
//create the two replicas //create the two replicas
for ((p, _) <- fetchInfo) { for ((p, _) <- fetchInfo) {

View File

@ -31,10 +31,10 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.LazyOffsetCheckpoints import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.server.metadata.CachedConfigRepository
import kafka.utils.TestUtils.createBroker import kafka.utils.TestUtils.createBroker
import kafka.utils.timer.MockTimer import kafka.utils.timer.MockTimer
import kafka.utils.{MockScheduler, MockTime, TestUtils} import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@ -67,7 +67,7 @@ class ReplicaManagerTest {
val time = new MockTime val time = new MockTime
val scheduler = new MockScheduler(time) val scheduler = new MockScheduler(time)
val metrics = new Metrics val metrics = new Metrics
var kafkaZkClient: KafkaZkClient = _ val configRepository = new CachedConfigRepository()
var alterIsrManager: AlterIsrManager = _ var alterIsrManager: AlterIsrManager = _
var config: KafkaConfig = _ var config: KafkaConfig = _
var quotaManager: QuotaManagers = _ var quotaManager: QuotaManagers = _
@ -80,10 +80,6 @@ class ReplicaManagerTest {
@BeforeEach @BeforeEach
def setUp(): Unit = { def setUp(): Unit = {
kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new Properties()).anyTimes()
EasyMock.replay(kafkaZkClient)
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
config = KafkaConfig.fromProps(props) config = KafkaConfig.fromProps(props)
alterIsrManager = EasyMock.createMock(classOf[AlterIsrManager]) alterIsrManager = EasyMock.createMock(classOf[AlterIsrManager])
@ -100,9 +96,9 @@ class ReplicaManagerTest {
@Test @Test
def testHighWaterMarkDirectoryMapping(): Unit = { def testHighWaterMarkDirectoryMapping(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository)
try { try {
val partition = rm.createPartition(new TopicPartition(topic, 1)) val partition = rm.createPartition(new TopicPartition(topic, 1))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@ -120,9 +116,9 @@ class ReplicaManagerTest {
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository)
try { try {
val partition = rm.createPartition(new TopicPartition(topic, 1)) val partition = rm.createPartition(new TopicPartition(topic, 1))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@ -137,9 +133,9 @@ class ReplicaManagerTest {
@Test @Test
def testIllegalRequiredAcks(): Unit = { def testIllegalRequiredAcks(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, Option(this.getClass.getName)) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository, Option(this.getClass.getName))
try { try {
def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS) assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@ -170,9 +166,9 @@ class ReplicaManagerTest {
val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache]) val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache) EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, val rm = new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager) metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository)
try { try {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
@ -1560,10 +1556,11 @@ class ReplicaManagerTest {
.setLeaderEpoch(leaderEpochFromLeader) .setLeaderEpoch(leaderEpochFromLeader)
.setEndOffset(offsetFromLeader)).asJava, .setEndOffset(offsetFromLeader)).asJava,
BrokerEndPoint(1, "host1" ,1), time) BrokerEndPoint(1, "host1" ,1), time)
val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr, val replicaManager = new ReplicaManager(config, metrics, time, None, mockScheduler, mockLogMgr,
new AtomicBoolean(false), quotaManager, mockBrokerTopicStats, new AtomicBoolean(false), quotaManager, mockBrokerTopicStats,
metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory, metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager) { mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, Option(this.getClass.getName),
configRepository, alterIsrManager) {
override protected def createReplicaFetcherManager(metrics: Metrics, override protected def createReplicaFetcherManager(metrics: Metrics,
time: Time, time: Time,
@ -1742,10 +1739,11 @@ class ReplicaManagerTest {
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false) purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
new ReplicaManager(config, metrics, time, kafkaZkClient, scheduler, mockLogMgr, new ReplicaManager(config, metrics, time, None, scheduler, mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory, metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager) mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName),
configRepository, alterIsrManager)
} }
@Test @Test
@ -1956,12 +1954,12 @@ class ReplicaManagerTest {
EasyMock.replay(metadataCache1) EasyMock.replay(metadataCache1)
// each replica manager is for a broker // each replica manager is for a broker
val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0, val rm0 = new ReplicaManager(config0, metrics, time, None, new MockScheduler(time), mockLogMgr0,
new AtomicBoolean(false), quotaManager, new AtomicBoolean(false), quotaManager,
brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager) brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager, configRepository)
val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1, val rm1 = new ReplicaManager(config1, metrics, time, None, new MockScheduler(time), mockLogMgr1,
new AtomicBoolean(false), quotaManager, new AtomicBoolean(false), quotaManager,
brokerTopicStats2, metadataCache1, new LogDirFailureChannel(config1.logDirs.size), alterIsrManager) brokerTopicStats2, metadataCache1, new LogDirFailureChannel(config1.logDirs.size), alterIsrManager, configRepository)
(rm0, rm1) (rm0, rm1)
} }
@ -2202,9 +2200,9 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, new ReplicaManager(config, metrics, time, None, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats, new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) { new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository) {
override def getPartitionOrException(topicPartition: TopicPartition): Partition = { override def getPartitionOrException(topicPartition: TopicPartition): Partition = {
throw Errors.NOT_LEADER_OR_FOLLOWER.exception() throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
} }

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import kafka.zk.KafkaZkClient import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.easymock.EasyMock import org.easymock.EasyMock
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -25,7 +25,6 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.DescribeConfigsRequestData import org.apache.kafka.common.message.DescribeConfigsRequestData
import org.apache.kafka.common.message.DescribeConfigsResponseData import org.apache.kafka.common.message.DescribeConfigsResponseData
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertFalse
@ -33,6 +32,8 @@ import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.Assertions.assertNotEquals
import java.util.Properties import java.util.Properties
import kafka.server.metadata.ZkConfigRepository
class ZkAdminManagerTest { class ZkAdminManagerTest {
private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
@ -46,9 +47,9 @@ class ZkAdminManagerTest {
metrics.close() metrics.close()
} }
def createAdminManager(): ZkAdminManager = { def createConfigHelper(metadataCache: MetadataCache, zkClient: KafkaZkClient): ConfigHelper = {
val props = TestUtils.createBrokerConfig(brokerId, "zk") val props = TestUtils.createBrokerConfig(brokerId, "zk")
new ZkAdminManager(KafkaConfig.fromProps(props), metrics, metadataCache, zkClient) new ConfigHelper(metadataCache, KafkaConfig.fromProps(props), new ZkConfigRepository(new AdminZkClient(zkClient)))
} }
@Test @Test
@ -62,8 +63,8 @@ class ZkAdminManagerTest {
.setResourceName(topic) .setResourceName(topic)
.setResourceType(ConfigResource.Type.TOPIC.id) .setResourceType(ConfigResource.Type.TOPIC.id)
.setConfigurationKeys(null)) .setConfigurationKeys(null))
val adminManager = createAdminManager() val configHelper = createConfigHelper(metadataCache, zkClient)
val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = adminManager.describeConfigs(resources, true, true) val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = configHelper.describeConfigs(resources, true, true)
assertEquals(Errors.NONE.code, results.head.errorCode()) assertEquals(Errors.NONE.code, results.head.errorCode())
assertFalse(results.head.configs().isEmpty, "Should return configs") assertFalse(results.head.configs().isEmpty, "Should return configs")
} }
@ -78,8 +79,8 @@ class ZkAdminManagerTest {
val resources = List(new DescribeConfigsRequestData.DescribeConfigsResource() val resources = List(new DescribeConfigsRequestData.DescribeConfigsResource()
.setResourceName(topic) .setResourceName(topic)
.setResourceType(ConfigResource.Type.TOPIC.id)) .setResourceType(ConfigResource.Type.TOPIC.id))
val adminManager = createAdminManager() val configHelper = createConfigHelper(metadataCache, zkClient)
val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = adminManager.describeConfigs(resources, true, true) val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = configHelper.describeConfigs(resources, true, true)
assertEquals(Errors.NONE.code, results.head.errorCode()) assertEquals(Errors.NONE.code, results.head.errorCode())
assertFalse(results.head.configs().isEmpty, "Should return configs") assertFalse(results.head.configs().isEmpty, "Should return configs")
} }
@ -91,7 +92,7 @@ class ZkAdminManagerTest {
EasyMock.expect(metadataCache.contains(topic)).andReturn(true) EasyMock.expect(metadataCache.contains(topic)).andReturn(true)
EasyMock.replay(zkClient, metadataCache) EasyMock.replay(zkClient, metadataCache)
val adminManager = createAdminManager() val configHelper = createConfigHelper(metadataCache, zkClient)
val resources = List( val resources = List(
new DescribeConfigsRequestData.DescribeConfigsResource() new DescribeConfigsRequestData.DescribeConfigsResource()
@ -101,7 +102,7 @@ class ZkAdminManagerTest {
.setResourceName(brokerId.toString) .setResourceName(brokerId.toString)
.setResourceType(ConfigResource.Type.BROKER.id)) .setResourceType(ConfigResource.Type.BROKER.id))
val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = adminManager.describeConfigs(resources, true, true) val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = configHelper.describeConfigs(resources, true, true)
assertEquals(2, results.size) assertEquals(2, results.size)
results.foreach(r => { results.foreach(r => {
assertEquals(Errors.NONE.code, r.errorCode) assertEquals(Errors.NONE.code, r.errorCode)

View File

@ -22,10 +22,11 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.log.{Log, LogManager} import kafka.log.{Log, LogManager}
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._ import kafka.server._
import kafka.server.metadata.CachedConfigRepository
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderPartition} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{OffsetForLeaderTopicResult, EpochEndOffset} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.record.RecordBatch
@ -41,6 +42,7 @@ class OffsetsForLeaderEpochTest {
private val time = new MockTime private val time = new MockTime
private val metrics = new Metrics private val metrics = new Metrics
private val alterIsrManager = TestUtils.createAlterIsrManager() private val alterIsrManager = TestUtils.createAlterIsrManager()
private val configRepository = new CachedConfigRepository()
private val tp = new TopicPartition("topic", 1) private val tp = new TopicPartition("topic", 1)
private var replicaManager: ReplicaManager = _ private var replicaManager: ReplicaManager = _
private var quotaManager: QuotaManagers = _ private var quotaManager: QuotaManagers = _
@ -65,9 +67,9 @@ class OffsetsForLeaderEpochTest {
replay(mockLog, logManager) replay(mockLog, logManager)
// create a replica manager with 1 partition that has 1 replica // create a replica manager with 1 partition that has 1 replica
replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(config, metrics, time, None, null, logManager, new AtomicBoolean(false),
quotaManager, new BrokerTopicStats, quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository)
val partition = replicaManager.createPartition(tp) val partition = replicaManager.createPartition(tp)
partition.setLog(mockLog, isFutureLog = false) partition.setLog(mockLog, isFutureLog = false)
partition.leaderReplicaIdOpt = Some(config.brokerId) partition.leaderReplicaIdOpt = Some(config.brokerId)
@ -88,9 +90,9 @@ class OffsetsForLeaderEpochTest {
replay(logManager) replay(logManager)
//create a replica manager with 1 partition that has 0 replica //create a replica manager with 1 partition that has 0 replica
replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(config, metrics, time, None, null, logManager, new AtomicBoolean(false),
quotaManager, new BrokerTopicStats, quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository)
replicaManager.createPartition(tp) replicaManager.createPartition(tp)
//Given //Given
@ -113,9 +115,9 @@ class OffsetsForLeaderEpochTest {
replay(logManager) replay(logManager)
//create a replica manager with 0 partition //create a replica manager with 0 partition
replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false), replicaManager = new ReplicaManager(config, metrics, time, None, null, logManager, new AtomicBoolean(false),
quotaManager, new BrokerTopicStats, quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, configRepository)
//Given //Given
val epochRequested: Integer = 5 val epochRequested: Integer = 5

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util.Properties
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class CachedConfigRepositoryTest {
@Test
def testEmptyRepository(): Unit = {
val repository = new CachedConfigRepository()
assertEquals(new Properties(), repository.brokerConfig(0))
assertEquals(new Properties(), repository.topicConfig("foo"))
}
@Test
def testSetBrokerConfig(): Unit = {
val repository = new CachedConfigRepository()
val brokerId0 = 0
repository.setBrokerConfig(brokerId0, "foo", null)
assertEquals(new Properties(), repository.brokerConfig(0))
val brokerId1 = 1
repository.setBrokerConfig(brokerId1, "foo", "bar")
val brokerProperties = new Properties()
brokerProperties.put("foo", "bar")
assertEquals(brokerProperties, repository.brokerConfig(brokerId1))
val brokerProperties2 = new Properties()
brokerProperties2.put("foo", "bar")
brokerProperties2.put("foo2", "baz")
repository.setBrokerConfig(brokerId1, "foo2", "baz") // add another prop
assertEquals(brokerProperties2, repository.brokerConfig(brokerId1)) // should get both props
repository.setBrokerConfig(brokerId1, "foo2", null)
assertEquals(brokerProperties, repository.brokerConfig(brokerId1))
}
@Test
def testSetTopicConfig(): Unit = {
val repository = new CachedConfigRepository()
val topic0 = "topic0"
repository.setTopicConfig(topic0, "foo", null)
assertEquals(new Properties(), repository.brokerConfig(0))
val topic1 = "topic1"
repository.setTopicConfig(topic1, "foo", "bar")
val topicProperties = new Properties()
topicProperties.put("foo", "bar")
assertEquals(topicProperties, repository.topicConfig(topic1))
val topicProperties2 = new Properties()
topicProperties2.put("foo", "bar")
topicProperties2.put("foo2", "baz")
repository.setTopicConfig(topic1, "foo2", "baz") // add another prop
assertEquals(topicProperties2, repository.topicConfig(topic1)) // should get both props
repository.setTopicConfig(topic1, "foo2", null)
assertEquals(topicProperties, repository.topicConfig(topic1))
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.server.metadata.ZkConfigRepository
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.mockito.Mockito.{mock, when}
class ZkConfigRepositoryTest {
@Test
def testZkConfigRepository(): Unit = {
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
val zkConfigRepository = ZkConfigRepository(zkClient)
val brokerId = 1
val topic = "topic"
val brokerProps = new Properties()
brokerProps.put("a", "b")
val topicProps = new Properties()
topicProps.put("c", "d")
when(zkClient.getEntityConfigs(ConfigType.Broker, brokerId.toString)).thenReturn(brokerProps)
when(zkClient.getEntityConfigs(ConfigType.Topic, topic)).thenReturn(topicProps)
assertEquals(brokerProps, zkConfigRepository.brokerConfig(brokerId))
assertEquals(topicProps, zkConfigRepository.topicConfig(topic))
}
@Test
def testUnsupportedTypes(): Unit = {
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
val zkConfigRepository = ZkConfigRepository(zkClient)
Type.values().foreach(value => if (value != Type.BROKER && value != Type.TOPIC)
assertThrows(classOf[IllegalArgumentException], () => zkConfigRepository.config(new ConfigResource(value, value.toString))))
}
}

View File

@ -17,15 +17,12 @@
package kafka.utils package kafka.utils
import kafka.server.{KafkaConfig, ReplicaFetcherManager, ReplicaManager}
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.controller.LeaderIsrAndControllerEpoch import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.{Log, LogManager}
import kafka.zk._ import kafka.zk._
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.api.{BeforeEach, Test}
import org.easymock.EasyMock
class ReplicationUtilsTest extends ZooKeeperTestHarness { class ReplicationUtilsTest extends ZooKeeperTestHarness {
private val zkVersion = 1 private val zkVersion = 1
@ -48,23 +45,6 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
@Test @Test
def testUpdateLeaderAndIsr(): Unit = { def testUpdateLeaderAndIsr(): Unit = {
val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
val log: Log = EasyMock.createMock(classOf[Log])
EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
EasyMock.expect(log)
EasyMock.replay(log)
val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes()
EasyMock.replay(logManager)
val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
zkClient.makeSurePersistentPathExists(IsrChangeNotificationZNode.path) zkClient.makeSurePersistentPathExists(IsrChangeNotificationZNode.path)
val replicas = List(0, 1) val replicas = List(0, 1)

View File

@ -29,7 +29,7 @@ import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager import javax.net.ssl.X509TrustManager
import kafka.api._ import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener, TopicConfigFetcher} import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.log._ import kafka.log._
import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer} import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._ import kafka.server._
@ -37,7 +37,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.controller.LeaderIsrAndControllerEpoch import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.metrics.KafkaYammerMetrics import kafka.metrics.KafkaYammerMetrics
import kafka.server.metadata.MetadataBroker import kafka.server.metadata.{ConfigRepository, CachedConfigRepository, MetadataBroker}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.zk._ import kafka.zk._
import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.CommonClientConfigs
@ -1155,12 +1155,10 @@ object TestUtils extends Logging {
new MockIsrChangeListener() new MockIsrChangeListener()
} }
class MockTopicConfigFetcher(var props: Properties) extends TopicConfigFetcher { def createConfigRepository(topic: String, props: Properties): ConfigRepository = {
override def fetch(): Properties = props val configRepository = new CachedConfigRepository()
} props.entrySet().forEach(e => configRepository.setTopicConfig(topic, e.getKey.toString, e.getValue.toString))
configRepository
def createTopicConfigProvider(props: Properties): MockTopicConfigFetcher = {
new MockTopicConfigFetcher(props)
} }
def produceMessages(servers: Seq[KafkaServer], def produceMessages(servers: Seq[KafkaServer],

View File

@ -40,6 +40,7 @@ import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota; import kafka.server.ReplicaQuota;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.CachedConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
import kafka.utils.Pool; import kafka.utils.Pool;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -158,7 +159,7 @@ public class ReplicaFetcherThreadBenchmark {
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L)); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
AlterIsrManager isrChannelManager = Mockito.mock(AlterIsrManager.class); AlterIsrManager isrChannelManager = Mockito.mock(AlterIsrManager.class);
Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(), Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
0, Time.SYSTEM, Properties::new, isrChangeListener, new DelayedOperationsMock(tp), 0, Time.SYSTEM, new CachedConfigRepository(), isrChangeListener, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager, isrChannelManager); Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
partition.makeFollower(partitionState, offsetCheckpoints); partition.makeFollower(partitionState, offsetCheckpoints);

View File

@ -37,6 +37,7 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager; import kafka.server.ReplicationQuotaManager;
import kafka.server.ZkAdminManager; import kafka.server.ZkAdminManager;
import kafka.server.metadata.CachedConfigRepository;
import kafka.zk.KafkaZkClient; import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
@ -178,6 +179,7 @@ public class MetadataRequestBenchmark {
kafkaZkClient, kafkaZkClient,
brokerId, brokerId,
new KafkaConfig(kafkaProps), new KafkaConfig(kafkaProps),
new CachedConfigRepository(),
metadataCache, metadataCache,
metrics, metrics,
Option.empty(), Option.empty(),

View File

@ -30,6 +30,7 @@ import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel; import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.CachedConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData; import org.apache.kafka.common.message.LeaderAndIsrRequestData;
@ -121,7 +122,7 @@ public class PartitionMakeFollowerBenchmark {
AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class); AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
partition = new Partition(tp, 100, partition = new Partition(tp, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
Properties::new, isrChangeListener, delayedOperations, new CachedConfigRepository(), isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager); Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
partition.createLogIfNotExists(true, false, offsetCheckpoints); partition.createLogIfNotExists(true, false, offsetCheckpoints);
executorService.submit((Runnable) () -> { executorService.submit((Runnable) () -> {

View File

@ -31,6 +31,7 @@ import kafka.server.LogDirFailureChannel;
import kafka.server.LogOffsetMetadata; import kafka.server.LogOffsetMetadata;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.CachedConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
@ -119,7 +120,7 @@ public class UpdateFollowerFetchStateBenchmark {
AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class); AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
partition = new Partition(topicPartition, 100, partition = new Partition(topicPartition, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
Properties::new, isrChangeListener, delayedOperations, new CachedConfigRepository(), isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager); Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
partition.makeLeader(partitionState, offsetCheckpoints); partition.makeLeader(partitionState, offsetCheckpoints);
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.jmh.server; package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.cluster.Partition; import kafka.cluster.Partition;
import kafka.log.CleanerConfig; import kafka.log.CleanerConfig;
import kafka.log.LogConfig; import kafka.log.LogConfig;
@ -29,14 +28,13 @@ import kafka.server.MetadataCache;
import kafka.server.QuotaFactory; import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.CachedConfigRepository;
import kafka.utils.KafkaScheduler; import kafka.utils.KafkaScheduler;
import kafka.utils.MockTime; import kafka.utils.MockTime;
import kafka.utils.Scheduler; import kafka.utils.Scheduler;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Fork;
@ -89,6 +87,7 @@ public class CheckpointBench {
private LogDirFailureChannel failureChannel; private LogDirFailureChannel failureChannel;
private LogManager logManager; private LogManager logManager;
private AlterIsrManager alterIsrManager; private AlterIsrManager alterIsrManager;
private final CachedConfigRepository configRepository = new CachedConfigRepository();
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -117,18 +116,12 @@ public class CheckpointBench {
this.metrics, this.metrics,
this.time, ""); this.time, "");
KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
@Override
public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
return new Properties();
}
};
this.alterIsrManager = TestUtils.createAlterIsrManager(); this.alterIsrManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManager( this.replicaManager = new ReplicaManager(
this.brokerProperties, this.brokerProperties,
this.metrics, this.metrics,
this.time, this.time,
zkClient, Option.empty(),
this.scheduler, this.scheduler,
this.logManager, this.logManager,
new AtomicBoolean(false), new AtomicBoolean(false),
@ -137,6 +130,7 @@ public class CheckpointBench {
metadataCache, metadataCache,
this.failureChannel, this.failureChannel,
alterIsrManager, alterIsrManager,
configRepository,
Option.empty()); Option.empty());
replicaManager.startup(); replicaManager.startup();