rework the reconfiguration logic a bit to fix the non-reconfigurable log config

This commit is contained in:
Colin P. McCabe 2024-09-25 15:35:03 -07:00
parent 0a2752a52b
commit d22d58da46
2 changed files with 28 additions and 12 deletions

View File

@ -30,7 +30,7 @@ import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient} import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter} import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigType, ServerConfigs, ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals} import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs} import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry import org.apache.kafka.server.telemetry.ClientTelemetry
@ -662,12 +662,24 @@ trait BrokerReconfigurable {
} }
object DynamicLogConfig { object DynamicLogConfig {
// Exclude message.format.version for now since we need to check that the version /**
// is supported on all brokers in the cluster. * The log configurations that are non-reconfigurable. This set contains the names you
* would use when setting a dynamic configuration on a topic, which are different than the
* corresponding broker configuration names.
*
* For now, message.format.version is not reconfigurable, since we need to check that
* the version is supported on all brokers in the cluster.
*/
val NonReconfigrableLogConfigs: Set[String] = Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
/**
* The broker configurations pertaining to logs that are reconfigurable. This set contains
* the names you would use when setting a static or dynamic broker configuration (not topic
* configuration).
*/
val ReconfigurableConfigs: Set[String] = val ReconfigurableConfigs: Set[String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.
val KafkaConfigToLogConfigName: Map[String, String] = filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
} }
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging { class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {
@ -733,9 +745,11 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
val originalLogConfig = logManager.currentDefaultConfig val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap) val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
originalLogConfig.originals().forEach((k, v) => { val originalLogConfigMap = originalLogConfig.originals()
if (!DynamicLogConfig.ReconfigurableConfigs.contains(k)) { DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
newBrokerDefaults.put(k, v) Option(originalLogConfigMap.get(k)) match {
case None => newBrokerDefaults.remove(k)
case Some(v) => newBrokerDefaults.put(k, v)
} }
}) })

View File

@ -64,7 +64,7 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.server.util.ShutdownableThread
@ -675,8 +675,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated") TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated")
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
props.asScala.foreach { case (k, v) => props.asScala.foreach { case (k, v) =>
val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k) val logConfigName = KafkaConfigToLogConfigName(k)
val expectedValue = if (k == ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG) s"[$v]" else v val expectedValue = if (k == ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG) s"[$v]" else v
assertEquals(expectedValue, log.config.originals.get(logConfigName).toString, assertEquals(expectedValue, log.config.originals.get(logConfigName).toString,
s"Not reconfigured $logConfigName for existing log") s"Not reconfigured $logConfigName for existing log")