diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 9014fab942d..b54defc1517 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -473,8 +473,14 @@ class AdminManager(val config: KafkaConfig, (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { val allNames = brokerSynonyms(name) val configEntryType = configType(name, allNames) - val isSensitive = configEntryType == ConfigDef.Type.PASSWORD - val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType) + // If we can't determine the config entry type, treat it as a sensitive config to be safe + val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null + val valueAsString = if (isSensitive) + null + else value match { + case v: String => v + case _ => ConfigDef.convertToString(value, configEntryType) + } val allSynonyms = configSynonyms(name, allNames, isSensitive) .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG) val synonyms = if (!includeSynonyms) List.empty else allSynonyms diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 92fd5d73136..766907a7de2 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -85,14 +85,17 @@ object DynamicBrokerConfig { private val PerBrokerConfigs = DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs + private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp) val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r - private[server] val DynamicPasswordConfigs = { + private val DynamicPasswordConfigs = { val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet AllDynamicConfigs.intersect(passwordConfigs) } + def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith) + def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = { name match { case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp => @@ -103,7 +106,12 @@ object DynamicBrokerConfig { List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp) case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp => List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp) - case ListenerConfigRegex(baseName) if matchListenerOverride => List(name, baseName) + case ListenerConfigRegex(baseName) if matchListenerOverride => + // `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism. + // and other listener configs are specified as listenerPrefix. + // Add as a synonym in both cases. + val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith) + List(name, mechanismConfig.getOrElse(baseName)) case _ => List(name) } } @@ -220,15 +228,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = { val props = configProps.clone().asInstanceOf[Properties] - def encodePassword(configName: String): Unit = { - val value = props.getProperty(configName) + def encodePassword(configName: String, value: String): Unit = { if (value != null) { if (!perBrokerConfig) throw new ConfigException("Password config can be defined only at broker level") props.setProperty(configName, passwordEncoder.encode(new Password(value))) } } - DynamicPasswordConfigs.foreach(encodePassword) + configProps.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => encodePassword(name, value) } props } @@ -250,8 +257,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (!perBrokerConfig) removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") - def decodePassword(configName: String): Unit = { - val value = props.getProperty(configName) + def decodePassword(configName: String, value: String): Unit = { if (value != null) { try { props.setProperty(configName, passwordEncoder.decode(value).value) @@ -263,7 +269,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } - DynamicPasswordConfigs.foreach(decodePassword) + props.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => decodePassword(name, value) } props } @@ -273,10 +279,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // have been removed during broker restart. private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { val props = persistentProps.clone().asInstanceOf[Properties] - if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) { + if (props.asScala.keySet.exists(isPasswordConfig)) { maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => - DynamicPasswordConfigs.foreach { configName => - val value = props.getProperty(configName) + persistentProps.asScala.filterKeys(isPasswordConfig).foreach { case (configName, value) => if (value != null) { val decoded = try { Some(passwordDecoder.decode(value).value) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 833a98ee43f..e207de8f069 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -667,10 +667,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // ZK with newly encoded values using password.encoder.secret. servers.foreach { server => val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) + val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties] val config = server.config val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured")) val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured")) - val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains) + val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig) + assertTrue("Password configs not found", passwordConfigs.nonEmpty) val passwordDecoder = new PasswordEncoder(secret, config.passwordEncoderKeyFactoryAlgorithm, config.passwordEncoderCipherAlgorithm, @@ -682,18 +684,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet config.passwordEncoderKeyLength, config.passwordEncoderIterations) passwordConfigs.foreach { case (name, value) => - val decoded = passwordDecoder.decode(value).value - props.put(name, passwordEncoder.encode(new Password(decoded))) + val decoded = passwordDecoder.decode(value).value + propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded))) } val brokerId = server.config.brokerId - adminZkClient.changeBrokerConfig(Seq(brokerId), props) + adminZkClient.changeBrokerConfig(Seq(brokerId), propsEncodedWithOldSecret) val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString) - passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) } + passwordConfigs.foreach { case (name, value) => assertNotEquals(props.get(value), updatedProps.get(name)) } server.startup() TestUtils.retry(10000) { val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString) - passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) } + passwordConfigs.foreach { case (name, value) => + assertEquals(passwordDecoder.decode(value), passwordDecoder.decode(newProps.getProperty(name))) } } } @@ -725,7 +728,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet .map { case (name, protocol) => s"${name.value}:${protocol.name}" } .mkString(",") + s",$listenerName:${securityProtocol.name}" - val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString) + val props = fetchBrokerConfigsFromZooKeeper(servers.head) props.put(KafkaConfig.ListenersProp, listeners) props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap) securityProtocol match { @@ -739,6 +742,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet case SecurityProtocol.PLAINTEXT => // no additional props } + // Add a config to verify that configs whose types are not known are not returned by describeConfigs() + val unknownConfig = "some.config" + props.put(unknownConfig, "some.config.value") + alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1), @@ -755,6 +762,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism))) else verifyListener(securityProtocol, None) + + val brokerConfigs = describeConfig(adminClients.head).entries.asScala + props.asScala.foreach { case (name, value) => + val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name")) + if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig) + assertNull(s"Password or unknown config returned $entry", entry.value) + else + assertEquals(value, entry.value) + } } private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol, @@ -778,7 +794,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" } .mkString(",") - val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString) + val props = fetchBrokerConfigsFromZooKeeper(servers.head) val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix)) listenerProps.foreach(props.remove) props.put(KafkaConfig.ListenersProp, listeners) @@ -811,6 +827,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyProduceConsume(producer, consumer, numRecords = 10, topic) } + private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = { + val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) + server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true) + } + private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) private def createProducer(trustStore: File, retries: Int, @@ -1090,9 +1111,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = { val keystoreProps = new Properties addKeystoreWithListenerPrefix(sslProperties, keystoreProps, SecureExternal) - kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true) + val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) - adminZkClient.changeBrokerConfig(brokers, keystoreProps) + adminZkClient.changeBrokerConfig(brokers, persistentProps) } private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index bca98d2cee0..5c88bf27f6d 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -142,14 +142,16 @@ class DynamicBrokerConfigTest { } private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) { - val config = KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)) + val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") + val config = KafkaConfig(configProps) val props = new Properties props.put(name, value) val oldValue = config.originals.get(name) def updateConfig() = { if (perBrokerConfig) - config.dynamicConfig.updateBrokerConfig(0, props) + config.dynamicConfig.updateBrokerConfig(0, config.dynamicConfig.toPersistentProps(props, perBrokerConfig)) else config.dynamicConfig.updateDefaultConfig(props) } @@ -266,6 +268,18 @@ class DynamicBrokerConfigTest { dynamicListenerConfig.validateReconfiguration(newConfig) } + @Test + def testSynonyms(): Unit = { + assertEquals(List("listener.name.secure.ssl.keystore.type", "ssl.keystore.type"), + DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type", matchListenerOverride = true)) + assertEquals(List("listener.name.sasl_ssl.plain.sasl.jaas.config", "sasl.jaas.config"), + DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config", matchListenerOverride = true)) + assertEquals(List("some.config"), + DynamicBrokerConfig.brokerConfigSynonyms("some.config", matchListenerOverride = true)) + assertEquals(List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp), + DynamicBrokerConfig.brokerConfigSynonyms(KafkaConfig.LogRollTimeMillisProp, matchListenerOverride = true)) + } + @Test def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = { val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])