mirror of https://github.com/apache/kafka.git
KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations (#17258)
Several Kafka log configurations in have synonyms. For example, log retention can be configured either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also a faculty in Kafka to dynamically change broker configurations without restarting the broker. These dynamically set configurations are stored in the metadata log and override what is in the broker properties file. Unfortunately, these two features interacted poorly; there was a bug where the dynamic log configuration update code ignored synonyms. For example, if you set log.retention.minutes and then reconfigured something unrelated that triggered the LogConfig update path, the retention value that you had configured was overwritten. The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker configuration as a bag of key/value entities rather than extracting the correct retention time (or other setting with overrides) from the KafkaConfig object. Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Federico Valeri <fedevaleri@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, amangandhi94 <>
This commit is contained in:
parent
e36c82d71c
commit
57b098c397
|
@ -30,7 +30,7 @@ import kafka.utils.Implicits._
|
||||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||||
import org.apache.kafka.common.Reconfigurable
|
import org.apache.kafka.common.Reconfigurable
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
|
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
|
||||||
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
|
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
|
||||||
import org.apache.kafka.common.config.types.Password
|
import org.apache.kafka.common.config.types.Password
|
||||||
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
|
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
|
||||||
|
@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
||||||
import org.apache.kafka.network.SocketServerConfigs
|
import org.apache.kafka.network.SocketServerConfigs
|
||||||
import org.apache.kafka.security.PasswordEncoder
|
import org.apache.kafka.security.PasswordEncoder
|
||||||
import org.apache.kafka.server.ProcessRole
|
import org.apache.kafka.server.ProcessRole
|
||||||
import org.apache.kafka.server.config.{ConfigType, ServerConfigs, ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
|
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
|
||||||
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
|
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
|
||||||
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
|
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
|
||||||
import org.apache.kafka.server.telemetry.ClientTelemetry
|
import org.apache.kafka.server.telemetry.ClientTelemetry
|
||||||
|
@ -662,12 +662,24 @@ trait BrokerReconfigurable {
|
||||||
}
|
}
|
||||||
|
|
||||||
object DynamicLogConfig {
|
object DynamicLogConfig {
|
||||||
// Exclude message.format.version for now since we need to check that the version
|
/**
|
||||||
// is supported on all brokers in the cluster.
|
* The log configurations that are non-reconfigurable. This set contains the names you
|
||||||
|
* would use when setting a dynamic configuration on a topic, which are different than the
|
||||||
|
* corresponding broker configuration names.
|
||||||
|
*
|
||||||
|
* For now, message.format.version is not reconfigurable, since we need to check that
|
||||||
|
* the version is supported on all brokers in the cluster.
|
||||||
|
*/
|
||||||
|
val NonReconfigrableLogConfigs: Set[String] = Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The broker configurations pertaining to logs that are reconfigurable. This set contains
|
||||||
|
* the names you would use when setting a static or dynamic broker configuration (not topic
|
||||||
|
* configuration).
|
||||||
|
*/
|
||||||
val ReconfigurableConfigs: Set[String] =
|
val ReconfigurableConfigs: Set[String] =
|
||||||
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
|
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.
|
||||||
val KafkaConfigToLogConfigName: Map[String, String] =
|
filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet
|
||||||
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {
|
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {
|
||||||
|
@ -732,17 +744,14 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
|
||||||
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
|
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
|
||||||
val originalLogConfig = logManager.currentDefaultConfig
|
val originalLogConfig = logManager.currentDefaultConfig
|
||||||
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
|
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
|
||||||
val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals)
|
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
|
||||||
newConfig.valuesFromThisConfig.forEach { (k, v) =>
|
val originalLogConfigMap = originalLogConfig.originals()
|
||||||
if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
|
DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
|
||||||
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
|
Option(originalLogConfigMap.get(k)) match {
|
||||||
if (v == null)
|
case None => newBrokerDefaults.remove(k)
|
||||||
newBrokerDefaults.remove(configName)
|
case Some(v) => newBrokerDefaults.put(k, v)
|
||||||
else
|
|
||||||
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
|
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
|
||||||
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
||||||
import org.apache.kafka.network.SocketServerConfigs
|
import org.apache.kafka.network.SocketServerConfigs
|
||||||
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
|
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
|
||||||
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
|
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
|
||||||
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
|
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
|
||||||
import org.apache.kafka.server.record.BrokerCompressionType
|
import org.apache.kafka.server.record.BrokerCompressionType
|
||||||
import org.apache.kafka.server.util.ShutdownableThread
|
import org.apache.kafka.server.util.ShutdownableThread
|
||||||
|
@ -667,8 +667,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
|
||||||
|
|
||||||
val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
|
val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
|
||||||
TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated")
|
TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated")
|
||||||
|
val KafkaConfigToLogConfigName: Map[String, String] =
|
||||||
|
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
|
||||||
props.asScala.foreach { case (k, v) =>
|
props.asScala.foreach { case (k, v) =>
|
||||||
val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
|
val logConfigName = KafkaConfigToLogConfigName(k)
|
||||||
val expectedValue = if (k == ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG) s"[$v]" else v
|
val expectedValue = if (k == ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG) s"[$v]" else v
|
||||||
assertEquals(expectedValue, log.config.originals.get(logConfigName).toString,
|
assertEquals(expectedValue, log.config.originals.get(logConfigName).toString,
|
||||||
s"Not reconfigured $logConfigName for existing log")
|
s"Not reconfigured $logConfigName for existing log")
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.server
|
||||||
|
|
||||||
import java.{lang, util}
|
import java.{lang, util}
|
||||||
import java.util.{Properties, Map => JMap}
|
import java.util.{Properties, Map => JMap}
|
||||||
import java.util.concurrent.CompletionStage
|
import java.util.concurrent.{CompletionStage, TimeUnit}
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import kafka.controller.KafkaController
|
import kafka.controller.KafkaController
|
||||||
import kafka.log.LogManager
|
import kafka.log.LogManager
|
||||||
|
@ -57,7 +57,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testConfigUpdate(): Unit = {
|
def testConfigUpdate(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
val oldKeystore = "oldKs.jks"
|
val oldKeystore = "oldKs.jks"
|
||||||
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
|
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
|
@ -102,7 +102,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testEnableDefaultUncleanLeaderElection(): Unit = {
|
def testEnableDefaultUncleanLeaderElection(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
origProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
|
origProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
|
||||||
|
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
|
@ -133,7 +133,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testUpdateDynamicThreadPool(): Unit = {
|
def testUpdateDynamicThreadPool(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
origProps.put(ServerConfigs.NUM_IO_THREADS_CONFIG, "4")
|
origProps.put(ServerConfigs.NUM_IO_THREADS_CONFIG, "4")
|
||||||
origProps.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2")
|
origProps.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2")
|
||||||
origProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
|
origProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
|
||||||
|
@ -205,7 +205,7 @@ class DynamicBrokerConfigTest {
|
||||||
@nowarn("cat=deprecation")
|
@nowarn("cat=deprecation")
|
||||||
@Test
|
@Test
|
||||||
def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
|
def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
|
origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
|
@ -227,7 +227,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
|
def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "100000000")
|
origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "100000000")
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
|
@ -261,7 +261,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testReconfigurableValidation(): Unit = {
|
def testReconfigurableValidation(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
val invalidReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, ServerConfigs.BROKER_ID_CONFIG, "some.prop")
|
val invalidReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, ServerConfigs.BROKER_ID_CONFIG, "some.prop")
|
||||||
val validReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "some.prop")
|
val validReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "some.prop")
|
||||||
|
@ -331,7 +331,7 @@ class DynamicBrokerConfigTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
|
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
|
||||||
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val configProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
|
configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
|
||||||
val config = KafkaConfig(configProps)
|
val config = KafkaConfig(configProps)
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
|
@ -440,7 +440,7 @@ class DynamicBrokerConfigTest {
|
||||||
def testDynamicListenerConfig(): Unit = {
|
def testDynamicListenerConfig(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
||||||
val oldConfig = KafkaConfig.fromProps(props)
|
val oldConfig = KafkaConfig.fromProps(props)
|
||||||
val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
|
val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
|
||||||
when(kafkaServer.config).thenReturn(oldConfig)
|
when(kafkaServer.config).thenReturn(oldConfig)
|
||||||
|
|
||||||
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
|
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
|
||||||
|
@ -480,11 +480,11 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testAuthorizerConfig(): Unit = {
|
def testAuthorizerConfig(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
val props = TestUtils.createBrokerConfig(0, null, port = 9092)
|
||||||
val oldConfig = KafkaConfig.fromProps(props)
|
val oldConfig = KafkaConfig.fromProps(props)
|
||||||
oldConfig.dynamicConfig.initialize(None, None)
|
oldConfig.dynamicConfig.initialize(None, None)
|
||||||
|
|
||||||
val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
|
val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
|
||||||
when(kafkaServer.config).thenReturn(oldConfig)
|
when(kafkaServer.config).thenReturn(oldConfig)
|
||||||
when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
|
when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
|
||||||
val metrics: Metrics = mock(classOf[Metrics])
|
val metrics: Metrics = mock(classOf[Metrics])
|
||||||
|
@ -630,7 +630,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testImproperConfigsAreRemoved(): Unit = {
|
def testImproperConfigsAreRemoved(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
|
val props = TestUtils.createBrokerConfig(0, null)
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
|
|
||||||
|
@ -659,7 +659,7 @@ class DynamicBrokerConfigTest {
|
||||||
@Test
|
@Test
|
||||||
def testUpdateMetricReporters(): Unit = {
|
def testUpdateMetricReporters(): Unit = {
|
||||||
val brokerId = 0
|
val brokerId = 0
|
||||||
val origProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(brokerId, null, port = 8181)
|
||||||
|
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
val serverMock = Mockito.mock(classOf[KafkaBroker])
|
val serverMock = Mockito.mock(classOf[KafkaBroker])
|
||||||
|
@ -684,7 +684,7 @@ class DynamicBrokerConfigTest {
|
||||||
@nowarn("cat=deprecation")
|
@nowarn("cat=deprecation")
|
||||||
def testUpdateMetricReportersNoJmxReporter(): Unit = {
|
def testUpdateMetricReportersNoJmxReporter(): Unit = {
|
||||||
val brokerId = 0
|
val brokerId = 0
|
||||||
val origProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(brokerId, null, port = 8181)
|
||||||
origProps.put(MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false")
|
origProps.put(MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false")
|
||||||
|
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
|
@ -711,8 +711,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
|
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
|
||||||
val props = new Properties()
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
props.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
|
|
||||||
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
|
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
|
||||||
val config = new KafkaConfig(props)
|
val config = new KafkaConfig(props)
|
||||||
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
|
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
|
||||||
|
@ -722,10 +721,10 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testDynamicLogLocalRetentionMsConfig(): Unit = {
|
def testDynamicLogLocalRetentionMsConfig(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
|
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer]))
|
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker]))
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
|
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
|
||||||
|
|
||||||
|
@ -745,10 +744,10 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testDynamicLogLocalRetentionSizeConfig(): Unit = {
|
def testDynamicLogLocalRetentionSizeConfig(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
|
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer]))
|
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker]))
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
|
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
|
||||||
|
|
||||||
|
@ -768,7 +767,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testDynamicLogLocalRetentionSkipsOnInvalidConfig(): Unit = {
|
def testDynamicLogLocalRetentionSkipsOnInvalidConfig(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
|
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
|
||||||
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
|
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
|
@ -794,7 +793,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
|
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
val kafkaBroker = mock(classOf[KafkaBroker])
|
val kafkaBroker = mock(classOf[KafkaBroker])
|
||||||
when(kafkaBroker.config).thenReturn(config)
|
when(kafkaBroker.config).thenReturn(config)
|
||||||
|
@ -828,7 +827,7 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
|
def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
origProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "2")
|
origProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "2")
|
||||||
|
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
|
@ -853,9 +852,9 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
|
def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
val props = TestUtils.createBrokerConfig(0, null, port = 9092)
|
||||||
val config = KafkaConfig.fromProps(props)
|
val config = KafkaConfig.fromProps(props)
|
||||||
val serverMock: KafkaServer = mock(classOf[KafkaServer])
|
val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
|
||||||
val remoteLogManager = mock(classOf[RemoteLogManager])
|
val remoteLogManager = mock(classOf[RemoteLogManager])
|
||||||
|
|
||||||
Mockito.when(serverMock.config).thenReturn(config)
|
Mockito.when(serverMock.config).thenReturn(config)
|
||||||
|
@ -884,9 +883,9 @@ class DynamicBrokerConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
|
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
val props = TestUtils.createBrokerConfig(0, null, port = 9092)
|
||||||
val config = KafkaConfig.fromProps(props)
|
val config = KafkaConfig.fromProps(props)
|
||||||
val serverMock: KafkaServer = mock(classOf[KafkaServer])
|
val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
|
||||||
val remoteLogManager = mock(classOf[RemoteLogManager])
|
val remoteLogManager = mock(classOf[RemoteLogManager])
|
||||||
|
|
||||||
Mockito.when(serverMock.config).thenReturn(config)
|
Mockito.when(serverMock.config).thenReturn(config)
|
||||||
|
@ -919,9 +918,9 @@ class DynamicBrokerConfigTest {
|
||||||
val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
|
val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
|
||||||
val fetchQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
|
val fetchQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
|
||||||
|
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
val props = TestUtils.createBrokerConfig(0, null, port = 9092)
|
||||||
val config = KafkaConfig.fromProps(props)
|
val config = KafkaConfig.fromProps(props)
|
||||||
val serverMock: KafkaServer = mock(classOf[KafkaServer])
|
val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
|
||||||
val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
|
val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
|
||||||
|
|
||||||
Mockito.when(serverMock.config).thenReturn(config)
|
Mockito.when(serverMock.config).thenReturn(config)
|
||||||
|
@ -969,11 +968,11 @@ class DynamicBrokerConfigTest {
|
||||||
retentionMs: Long,
|
retentionMs: Long,
|
||||||
logLocalRetentionBytes: Long,
|
logLocalRetentionBytes: Long,
|
||||||
retentionBytes: Long): Unit = {
|
retentionBytes: Long): Unit = {
|
||||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
|
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
|
||||||
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
|
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
|
||||||
val config = KafkaConfig(props)
|
val config = KafkaConfig(props)
|
||||||
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer]))
|
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker]))
|
||||||
config.dynamicConfig.initialize(None, None)
|
config.dynamicConfig.initialize(None, None)
|
||||||
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
|
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
|
||||||
|
|
||||||
|
@ -985,6 +984,51 @@ class DynamicBrokerConfigTest {
|
||||||
// validate per broker config
|
// validate per broker config
|
||||||
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
|
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class DynamicLogConfigContext(origProps: Properties) {
|
||||||
|
val config = KafkaConfig(origProps)
|
||||||
|
val serverMock = Mockito.mock(classOf[BrokerServer])
|
||||||
|
val logManagerMock = Mockito.mock(classOf[LogManager])
|
||||||
|
|
||||||
|
Mockito.when(serverMock.config).thenReturn(config)
|
||||||
|
Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
|
||||||
|
Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
|
||||||
|
|
||||||
|
val currentDefaultLogConfig = new AtomicReference(new LogConfig(new Properties))
|
||||||
|
Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get())
|
||||||
|
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
|
||||||
|
.thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0)))
|
||||||
|
|
||||||
|
config.dynamicConfig.initialize(None, None)
|
||||||
|
config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testDynamicLogConfigHandlesSynonymsCorrectly(): Unit = {
|
||||||
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
|
origProps.put(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, "1")
|
||||||
|
val ctx = new DynamicLogConfigContext(origProps)
|
||||||
|
assertEquals(TimeUnit.MINUTES.toMillis(1), ctx.config.logRetentionTimeMillis)
|
||||||
|
|
||||||
|
val props = new Properties()
|
||||||
|
props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "12345678")
|
||||||
|
ctx.config.dynamicConfig.updateDefaultConfig(props)
|
||||||
|
assertEquals(TimeUnit.MINUTES.toMillis(1), ctx.currentDefaultLogConfig.get().retentionMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable(): Unit = {
|
||||||
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
|
origProps.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "1")
|
||||||
|
val ctx = new DynamicLogConfigContext(origProps)
|
||||||
|
assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
|
||||||
|
|
||||||
|
val props = new Properties()
|
||||||
|
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, "3")
|
||||||
|
ctx.config.dynamicConfig.updateDefaultConfig(props)
|
||||||
|
assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
|
||||||
|
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestDynamicThreadPool() extends BrokerReconfigurable {
|
class TestDynamicThreadPool() extends BrokerReconfigurable {
|
||||||
|
|
Loading…
Reference in New Issue