mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: Fix AdminClient.describeConfigs() of listener configs (#4747)
Don't return config values from `describeConfigs` if the config type cannot be determined. Obtain config types correctly for listener configs for `describeConfigs` and password encryption. Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
		
							parent
							
								
									f0a29a6935
								
							
						
					
					
						commit
						57b1c28d60
					
				|  | @ -473,8 +473,14 @@ class AdminManager(val config: KafkaConfig, | |||
|                                      (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { | ||||
|     val allNames = brokerSynonyms(name) | ||||
|     val configEntryType = configType(name, allNames) | ||||
|     val isSensitive = configEntryType == ConfigDef.Type.PASSWORD | ||||
|     val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType) | ||||
|     // If we can't determine the config entry type, treat it as a sensitive config to be safe | ||||
|     val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null | ||||
|     val valueAsString = if (isSensitive) | ||||
|       null | ||||
|     else value match { | ||||
|       case v: String => v | ||||
|       case _ => ConfigDef.convertToString(value, configEntryType) | ||||
|     } | ||||
|     val allSynonyms = configSynonyms(name, allNames, isSensitive) | ||||
|         .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG) | ||||
|     val synonyms = if (!includeSynonyms) List.empty else allSynonyms | ||||
|  |  | |||
|  | @ -85,14 +85,17 @@ object DynamicBrokerConfig { | |||
| 
 | ||||
|   private val PerBrokerConfigs = DynamicSecurityConfigs  ++ | ||||
|     DynamicListenerConfig.ReconfigurableConfigs | ||||
|   private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp) | ||||
| 
 | ||||
|   val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r | ||||
| 
 | ||||
|   private[server] val DynamicPasswordConfigs = { | ||||
|   private val DynamicPasswordConfigs = { | ||||
|     val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet | ||||
|     AllDynamicConfigs.intersect(passwordConfigs) | ||||
|   } | ||||
| 
 | ||||
|   def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith) | ||||
| 
 | ||||
|   def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = { | ||||
|     name match { | ||||
|       case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp => | ||||
|  | @ -103,7 +106,12 @@ object DynamicBrokerConfig { | |||
|         List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp) | ||||
|       case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp => | ||||
|         List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp) | ||||
|       case ListenerConfigRegex(baseName) if matchListenerOverride => List(name, baseName) | ||||
|       case ListenerConfigRegex(baseName) if matchListenerOverride => | ||||
|         // `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism.<configName> | ||||
|         // and other listener configs are specified as listenerPrefix.<configName> | ||||
|         // Add <configName> as a synonym in both cases. | ||||
|         val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith) | ||||
|         List(name, mechanismConfig.getOrElse(baseName)) | ||||
|       case _ => List(name) | ||||
|     } | ||||
|   } | ||||
|  | @ -220,15 +228,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging | |||
|   private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = { | ||||
|     val props = configProps.clone().asInstanceOf[Properties] | ||||
| 
 | ||||
|     def encodePassword(configName: String): Unit = { | ||||
|       val value = props.getProperty(configName) | ||||
|     def encodePassword(configName: String, value: String): Unit = { | ||||
|       if (value != null) { | ||||
|         if (!perBrokerConfig) | ||||
|           throw new ConfigException("Password config can be defined only at broker level") | ||||
|         props.setProperty(configName, passwordEncoder.encode(new Password(value))) | ||||
|       } | ||||
|     } | ||||
|     DynamicPasswordConfigs.foreach(encodePassword) | ||||
|     configProps.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => encodePassword(name, value) } | ||||
|     props | ||||
|   } | ||||
| 
 | ||||
|  | @ -250,8 +257,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging | |||
|     if (!perBrokerConfig) | ||||
|       removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") | ||||
| 
 | ||||
|     def decodePassword(configName: String): Unit = { | ||||
|       val value = props.getProperty(configName) | ||||
|     def decodePassword(configName: String, value: String): Unit = { | ||||
|       if (value != null) { | ||||
|         try { | ||||
|           props.setProperty(configName, passwordEncoder.decode(value).value) | ||||
|  | @ -263,7 +269,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging | |||
|       } | ||||
|     } | ||||
| 
 | ||||
|     DynamicPasswordConfigs.foreach(decodePassword) | ||||
|     props.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => decodePassword(name, value) } | ||||
|     props | ||||
|   } | ||||
| 
 | ||||
|  | @ -273,10 +279,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging | |||
|   // have been removed during broker restart. | ||||
|   private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { | ||||
|     val props = persistentProps.clone().asInstanceOf[Properties] | ||||
|     if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) { | ||||
|     if (props.asScala.keySet.exists(isPasswordConfig)) { | ||||
|       maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => | ||||
|         DynamicPasswordConfigs.foreach { configName => | ||||
|           val value = props.getProperty(configName) | ||||
|         persistentProps.asScala.filterKeys(isPasswordConfig).foreach { case (configName, value) => | ||||
|           if (value != null) { | ||||
|             val decoded = try { | ||||
|               Some(passwordDecoder.decode(value).value) | ||||
|  |  | |||
|  | @ -667,10 +667,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|     // ZK with newly encoded values using password.encoder.secret. | ||||
|     servers.foreach { server => | ||||
|       val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) | ||||
|       val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties] | ||||
|       val config = server.config | ||||
|       val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured")) | ||||
|       val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured")) | ||||
|       val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains) | ||||
|       val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig) | ||||
|       assertTrue("Password configs not found", passwordConfigs.nonEmpty) | ||||
|       val passwordDecoder = new PasswordEncoder(secret, | ||||
|         config.passwordEncoderKeyFactoryAlgorithm, | ||||
|         config.passwordEncoderCipherAlgorithm, | ||||
|  | @ -682,18 +684,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|         config.passwordEncoderKeyLength, | ||||
|         config.passwordEncoderIterations) | ||||
|       passwordConfigs.foreach { case (name, value) => | ||||
|           val decoded = passwordDecoder.decode(value).value | ||||
|           props.put(name, passwordEncoder.encode(new Password(decoded))) | ||||
|         val decoded = passwordDecoder.decode(value).value | ||||
|         propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded))) | ||||
|       } | ||||
|       val brokerId = server.config.brokerId | ||||
|       adminZkClient.changeBrokerConfig(Seq(brokerId), props) | ||||
|       adminZkClient.changeBrokerConfig(Seq(brokerId), propsEncodedWithOldSecret) | ||||
|       val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString) | ||||
|       passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) } | ||||
|       passwordConfigs.foreach { case (name, value) => assertNotEquals(props.get(value), updatedProps.get(name)) } | ||||
| 
 | ||||
|       server.startup() | ||||
|       TestUtils.retry(10000) { | ||||
|         val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString) | ||||
|         passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) } | ||||
|         passwordConfigs.foreach { case (name, value) => | ||||
|           assertEquals(passwordDecoder.decode(value), passwordDecoder.decode(newProps.getProperty(name))) } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|  | @ -725,7 +728,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|       .map { case (name, protocol) => s"${name.value}:${protocol.name}" } | ||||
|       .mkString(",") + s",$listenerName:${securityProtocol.name}" | ||||
| 
 | ||||
|     val props =  adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString) | ||||
|     val props = fetchBrokerConfigsFromZooKeeper(servers.head) | ||||
|     props.put(KafkaConfig.ListenersProp, listeners) | ||||
|     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap) | ||||
|     securityProtocol match { | ||||
|  | @ -739,6 +742,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|       case SecurityProtocol.PLAINTEXT => // no additional props | ||||
|     } | ||||
| 
 | ||||
|     // Add a config to verify that configs whose types are not known are not returned by describeConfigs() | ||||
|     val unknownConfig = "some.config" | ||||
|     props.put(unknownConfig, "some.config.value") | ||||
| 
 | ||||
|     alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get | ||||
| 
 | ||||
|     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1), | ||||
|  | @ -755,6 +762,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|       saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism))) | ||||
|     else | ||||
|       verifyListener(securityProtocol, None) | ||||
| 
 | ||||
|     val brokerConfigs = describeConfig(adminClients.head).entries.asScala | ||||
|     props.asScala.foreach { case (name, value) => | ||||
|       val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name")) | ||||
|       if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig) | ||||
|         assertNull(s"Password or unknown config returned $entry", entry.value) | ||||
|       else | ||||
|         assertEquals(value, entry.value) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol, | ||||
|  | @ -778,7 +794,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|       .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" } | ||||
|       .mkString(",") | ||||
| 
 | ||||
|     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString) | ||||
|     val props = fetchBrokerConfigsFromZooKeeper(servers.head) | ||||
|     val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix)) | ||||
|     listenerProps.foreach(props.remove) | ||||
|     props.put(KafkaConfig.ListenersProp, listeners) | ||||
|  | @ -811,6 +827,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|     verifyProduceConsume(producer, consumer, numRecords = 10, topic) | ||||
|   } | ||||
| 
 | ||||
|   private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = { | ||||
|     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) | ||||
|     server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true) | ||||
|   } | ||||
| 
 | ||||
|   private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) | ||||
| 
 | ||||
|   private def createProducer(trustStore: File, retries: Int, | ||||
|  | @ -1090,9 +1111,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|   private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = { | ||||
|     val keystoreProps = new Properties | ||||
|     addKeystoreWithListenerPrefix(sslProperties, keystoreProps, SecureExternal) | ||||
|     kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true) | ||||
|     val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true) | ||||
|     zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) | ||||
|     adminZkClient.changeBrokerConfig(brokers, keystoreProps) | ||||
|     adminZkClient.changeBrokerConfig(brokers, persistentProps) | ||||
|   } | ||||
| 
 | ||||
|   private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { | ||||
|  |  | |||
|  | @ -142,14 +142,16 @@ class DynamicBrokerConfigTest { | |||
|   } | ||||
| 
 | ||||
|   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) { | ||||
|     val config = KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)) | ||||
|     val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) | ||||
|     configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") | ||||
|     val config = KafkaConfig(configProps) | ||||
|     val props = new Properties | ||||
|     props.put(name, value) | ||||
|     val oldValue = config.originals.get(name) | ||||
| 
 | ||||
|     def updateConfig() = { | ||||
|       if (perBrokerConfig) | ||||
|         config.dynamicConfig.updateBrokerConfig(0, props) | ||||
|         config.dynamicConfig.updateBrokerConfig(0, config.dynamicConfig.toPersistentProps(props, perBrokerConfig)) | ||||
|       else | ||||
|         config.dynamicConfig.updateDefaultConfig(props) | ||||
|     } | ||||
|  | @ -266,6 +268,18 @@ class DynamicBrokerConfigTest { | |||
|     dynamicListenerConfig.validateReconfiguration(newConfig) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testSynonyms(): Unit = { | ||||
|     assertEquals(List("listener.name.secure.ssl.keystore.type", "ssl.keystore.type"), | ||||
|       DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type", matchListenerOverride = true)) | ||||
|     assertEquals(List("listener.name.sasl_ssl.plain.sasl.jaas.config", "sasl.jaas.config"), | ||||
|       DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config", matchListenerOverride = true)) | ||||
|     assertEquals(List("some.config"), | ||||
|       DynamicBrokerConfig.brokerConfigSynonyms("some.config", matchListenerOverride = true)) | ||||
|     assertEquals(List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp), | ||||
|       DynamicBrokerConfig.brokerConfigSynonyms(KafkaConfig.LogRollTimeMillisProp, matchListenerOverride = true)) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = { | ||||
|     val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient]) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue