MINOR: Refactor DynamicConfig (#16133)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-05-31 19:09:46 +02:00 committed by GitHub
parent 7e81cc5e68
commit b6d0fb055d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 39 additions and 31 deletions

View File

@ -110,13 +110,15 @@ object DynamicBrokerConfig {
private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private val DynamicPasswordConfigs = {
val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
AllDynamicConfigs.intersect(passwordConfigs)
}
private val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- DynamicConfig.Broker.names.asScala
def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith)
def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
@ -166,7 +168,7 @@ object DynamicBrokerConfig {
}
private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
props.asScala.keySet.intersect(nonDynamicProps)
}
private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
val nonDynamic = configNames.intersect(nonDynamicProps)
require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
}
@ -674,11 +676,10 @@ trait BrokerReconfigurable {
object DynamicLogConfig {
// Exclude message.format.version for now since we need to check that the version
// is supported on all brokers in the cluster.
@nowarn("cat=deprecation")
val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)
val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet -- ExcludedConfigs
val KafkaConfigToLogConfigName = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
val ReconfigurableConfigs: Set[String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
}
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {

View File

@ -20,9 +20,6 @@ package kafka.server
import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
import java.util
@ -35,48 +32,44 @@ import scala.jdk.CollectionConverters._
object DynamicConfig {
object Broker {
// Definitions
val brokerConfigDef = new ConfigDef()
// Round minimum value down, to make it easier for users.
.define(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC)
DynamicBrokerConfig.addDynamicConfigs(brokerConfigDef)
val nonDynamicProps = KafkaConfig.configNames.toSet -- brokerConfigDef.names.asScala
private val brokerConfigs = QuotaConfigs.brokerQuotaConfigs()
DynamicBrokerConfig.addDynamicConfigs(brokerConfigs)
def names = brokerConfigDef.names
def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys
def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
def names: util.Set[String] = brokerConfigs.names
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(brokerConfigs, props, customPropsAllowed = true)
}
object Client {
private val clientConfigs = QuotaConfigs.userAndClientQuotaConfigs()
def configKeys = clientConfigs.configKeys
def configKeys: util.Map[String, ConfigDef.ConfigKey] = clientConfigs.configKeys
def names = clientConfigs.names
def names: util.Set[String] = clientConfigs.names
def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
}
object User {
private val userConfigs = QuotaConfigs.scramMechanismsPlusUserAndClientQuotaConfigs()
def configKeys = userConfigs.configKeys
def configKeys: util.Map[String, ConfigDef.ConfigKey] = userConfigs.configKeys
def names = userConfigs.names
def names: util.Set[String] = userConfigs.names
def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
}
object Ip {
private val ipConfigs = QuotaConfigs.ipConfigs()
def configKeys = ipConfigs.configKeys
def configKeys: util.Map[String, ConfigDef.ConfigKey] = ipConfigs.configKeys
def names = ipConfigs.names
def names: util.Set[String] = ipConfigs.names
def validate(props: Properties) = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)
def isValidIpEntity(ip: String): Boolean = {
if (ip != ZooKeeperInternals.DEFAULT_STRING) {

View File

@ -497,7 +497,7 @@ object KafkaConfig {
if (configType != null) {
Some(configType)
} else {
val configKey = DynamicConfig.Broker.brokerConfigDef.configKeys().get(exactName)
val configKey = DynamicConfig.Broker.configKeys.get(exactName)
if (configKey != null) {
Some(configKey.`type`)
} else {

View File

@ -121,6 +121,20 @@ public class QuotaConfigs {
ConfigDef.Importance.MEDIUM, CONTROLLER_MUTATION_RATE_DOC);
}
public static ConfigDef brokerQuotaConfigs() {
return new ConfigDef()
// Round minimum value down, to make it easier for users.
.define(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, ConfigDef.Type.LONG,
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM, QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, ConfigDef.Type.LONG,
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, ConfigDef.Type.LONG,
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM, QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC);
}
public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);