From c9318ced19c812c90066af468732d8af8d12f890 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 5 Sep 2019 09:23:11 +0100 Subject: [PATCH] KAFKA-8857; Don't check synonyms while determining if config is readOnly (#7278) Reviewers: Manikumar Reddy --- .../scala/kafka/server/AdminManager.scala | 2 +- .../DynamicBrokerReconfigurationTest.scala | 41 +++++++++++++++++-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index fc3c93432f3..3ed365e43c7 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -661,7 +661,7 @@ class AdminManager(val config: KafkaConfig, .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG) val synonyms = if (!includeSynonyms) List.empty else allSynonyms val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source - val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains) + val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name) new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 3a9920e6eb2..f58046ad489 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -119,6 +119,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") + props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) + props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) props ++= sslProperties1 props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal)) @@ -159,9 +161,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } @Test - def testKeyStoreDescribeUsingAdminClient(): Unit = { + def testConfigDescribeUsingAdminClient(): Unit = { - def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = { + def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean, + expectedProps: Properties): Unit = { if (isSensitive) { assertTrue(s"Value is sensitive: $configName", configEntry.isSensitive) assertNull(s"Sensitive value returned for $configName", configEntry.value) @@ -169,6 +172,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet assertFalse(s"Config is not sensitive: $configName", configEntry.isSensitive) assertEquals(expectedProps.getProperty(configName), configEntry.value) } + assertEquals(s"isReadOnly incorrect for $configName: $configEntry", isReadOnly, configEntry.isReadOnly) } def verifySynonym(configName: String, synonym: ConfigSynonym, isSensitive: Boolean, @@ -203,7 +207,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet KEYSTORE_PROPS.asScala.foreach { configName => val desc = configEntry(configDesc, s"$prefix$configName") val isSensitive = configName.contains("password") - verifyConfig(configName, desc, isSensitive, if (prefix.isEmpty) invalidSslProperties else sslProperties1) + verifyConfig(configName, desc, isSensitive, isReadOnly = prefix.nonEmpty, if (prefix.isEmpty) invalidSslProperties else sslProperties1) val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) Some("JKS") else None verifySynonyms(configName, desc.synonyms, isSensitive, prefix, defaultValue) } @@ -215,6 +219,37 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val configDesc = describeConfig(adminClient) verifySslConfig("listener.name.external.", sslProperties1, configDesc) verifySslConfig("", invalidSslProperties, configDesc) + + // Verify a few log configs with and without synonyms + val expectedProps = new Properties + expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000") + expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168") + expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168") + expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1") + val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp) + verifyConfig(KafkaConfig.LogRetentionTimeMillisProp, logRetentionMs, + isSensitive = false, isReadOnly = false, expectedProps) + val logRetentionHours = configEntry(configDesc, KafkaConfig.LogRetentionTimeHoursProp) + verifyConfig(KafkaConfig.LogRetentionTimeHoursProp, logRetentionHours, + isSensitive = false, isReadOnly = true, expectedProps) + val logRollHours = configEntry(configDesc, KafkaConfig.LogRollTimeHoursProp) + verifyConfig(KafkaConfig.LogRollTimeHoursProp, logRollHours, + isSensitive = false, isReadOnly = true, expectedProps) + val logCleanerThreads = configEntry(configDesc, KafkaConfig.LogCleanerThreadsProp) + verifyConfig(KafkaConfig.LogCleanerThreadsProp, logCleanerThreads, + isSensitive = false, isReadOnly = false, expectedProps) + + def synonymsList(configEntry: ConfigEntry): List[(String, ConfigSource)] = + configEntry.synonyms.asScala.map(s => (s.name, s.source)).toList + assertEquals(List((KafkaConfig.LogRetentionTimeMillisProp, ConfigSource.STATIC_BROKER_CONFIG), + (KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.STATIC_BROKER_CONFIG), + (KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), + synonymsList(logRetentionMs)) + assertEquals(List((KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.STATIC_BROKER_CONFIG), + (KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), + synonymsList(logRetentionHours)) + assertEquals(List((KafkaConfig.LogRollTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logRollHours)) + assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads)) } @Test