From b6d0fb055d1b74c918f317027760e0bb7cf929a3 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Fri, 31 May 2024 19:09:46 +0200 Subject: [PATCH] MINOR: Refactor DynamicConfig (#16133) Reviewers: Chia-Ping Tsai --- .../kafka/server/DynamicBrokerConfig.scala | 17 +++++---- .../scala/kafka/server/DynamicConfig.scala | 37 ++++++++----------- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../kafka/server/config/QuotaConfigs.java | 14 +++++++ 4 files changed, 39 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 6fa43b560dd..c9bb2e3b4ff 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -110,13 +110,15 @@ object DynamicBrokerConfig { private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) - val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r + private val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r private val DynamicPasswordConfigs = { val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet AllDynamicConfigs.intersect(passwordConfigs) } + private val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- DynamicConfig.Broker.names.asScala + def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith) def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = { @@ -166,7 +168,7 @@ object DynamicBrokerConfig { } private def nonDynamicConfigs(props: Properties): Set[String] = { - props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps) + props.asScala.keySet.intersect(nonDynamicProps) } private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = { @@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) { - val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains) + val nonDynamic = configNames.intersect(nonDynamicProps) require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic") } @@ -674,11 +676,10 @@ trait BrokerReconfigurable { object DynamicLogConfig { // Exclude message.format.version for now since we need to check that the version // is supported on all brokers in the cluster. - @nowarn("cat=deprecation") - val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG) - - val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet -- ExcludedConfigs - val KafkaConfigToLogConfigName = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) } + val ReconfigurableConfigs: Set[String] = + ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG + val KafkaConfigToLogConfigName: Map[String, String] = + ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) } } class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging { diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 5a5e3893179..955103bea6f 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -20,9 +20,6 @@ package kafka.server import java.net.{InetAddress, UnknownHostException} import java.util.Properties import org.apache.kafka.common.config.ConfigDef -import org.apache.kafka.common.config.ConfigDef.Importance._ -import org.apache.kafka.common.config.ConfigDef.Range._ -import org.apache.kafka.common.config.ConfigDef.Type._ import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals} import java.util @@ -35,48 +32,44 @@ import scala.jdk.CollectionConverters._ object DynamicConfig { object Broker { - // Definitions - val brokerConfigDef = new ConfigDef() - // Round minimum value down, to make it easier for users. - .define(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_DOC) - .define(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_DOC) - .define(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC) - DynamicBrokerConfig.addDynamicConfigs(brokerConfigDef) - val nonDynamicProps = KafkaConfig.configNames.toSet -- brokerConfigDef.names.asScala + private val brokerConfigs = QuotaConfigs.brokerQuotaConfigs() + DynamicBrokerConfig.addDynamicConfigs(brokerConfigs) - def names = brokerConfigDef.names + def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys - def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true) + def names: util.Set[String] = brokerConfigs.names + + def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(brokerConfigs, props, customPropsAllowed = true) } object Client { private val clientConfigs = QuotaConfigs.userAndClientQuotaConfigs() - def configKeys = clientConfigs.configKeys + def configKeys: util.Map[String, ConfigDef.ConfigKey] = clientConfigs.configKeys - def names = clientConfigs.names + def names: util.Set[String] = clientConfigs.names - def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false) + def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false) } object User { private val userConfigs = QuotaConfigs.scramMechanismsPlusUserAndClientQuotaConfigs() - def configKeys = userConfigs.configKeys + def configKeys: util.Map[String, ConfigDef.ConfigKey] = userConfigs.configKeys - def names = userConfigs.names + def names: util.Set[String] = userConfigs.names - def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false) + def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false) } object Ip { private val ipConfigs = QuotaConfigs.ipConfigs() - def configKeys = ipConfigs.configKeys + def configKeys: util.Map[String, ConfigDef.ConfigKey] = ipConfigs.configKeys - def names = ipConfigs.names + def names: util.Set[String] = ipConfigs.names - def validate(props: Properties) = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false) + def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false) def isValidIpEntity(ip: String): Boolean = { if (ip != ZooKeeperInternals.DEFAULT_STRING) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c807ce5d6b6..82231083829 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -497,7 +497,7 @@ object KafkaConfig { if (configType != null) { Some(configType) } else { - val configKey = DynamicConfig.Broker.brokerConfigDef.configKeys().get(exactName) + val configKey = DynamicConfig.Broker.configKeys.get(exactName) if (configKey != null) { Some(configKey.`type`) } else { diff --git a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java index 88b49b82827..1005672a8d6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java @@ -121,6 +121,20 @@ public class QuotaConfigs { ConfigDef.Importance.MEDIUM, CONTROLLER_MUTATION_RATE_DOC); } + public static ConfigDef brokerQuotaConfigs() { + return new ConfigDef() + // Round minimum value down, to make it easier for users. + .define(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, ConfigDef.Type.LONG, + QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_DOC) + .define(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, ConfigDef.Type.LONG, + QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_DOC) + .define(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, ConfigDef.Type.LONG, + QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC); + } + public static ConfigDef userAndClientQuotaConfigs() { ConfigDef configDef = new ConfigDef(); buildUserClientQuotaConfigDef(configDef);