Handle case of default broker in config migration (#14237)

When collecting the set of broker IDs during the migration, don't try to parse the default broker resource `""` as a broker ID.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2023-08-18 12:44:01 -04:00 committed by GitHub
parent ee036ed9ef
commit 3ad5f42f59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 2 deletions

View File

@ -197,7 +197,9 @@ class ZkMigrationClient(
brokerIdConsumer: Consumer[Integer]
): Unit = wrapZkException {
configClient.iterateBrokerConfigs((broker, props) => {
brokerIdConsumer.accept(Integer.valueOf(broker))
if (broker.nonEmpty) {
brokerIdConsumer.accept(Integer.valueOf(broker))
}
val batch = new util.ArrayList[ApiMessageAndVersion]()
props.forEach((key, value) => {
batch.add(new ApiMessageAndVersion(new ConfigRecord()

View File

@ -62,10 +62,15 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
props.put(KafkaConfig.SslKeystorePasswordProp, encoder.encode(new Password(SECRET))) // sensitive config
zkClient.setOrCreateEntityConfigs(ConfigType.Broker, "1", props)
val defaultProps = new Properties()
defaultProps.put(KafkaConfig.DefaultReplicationFactorProp, "3") // normal config
zkClient.setOrCreateEntityConfigs(ConfigType.Broker, "<default>", defaultProps)
migrationClient.migrateBrokerConfigs(batch => batches.add(batch), brokerId => brokers.add(brokerId))
assertEquals(1, brokers.size())
assertEquals(1, batches.size())
assertEquals(2, batches.size())
assertEquals(2, batches.get(0).size)
assertEquals(1, batches.get(1).size)
batches.get(0).forEach(record => {
val message = record.message().asInstanceOf[ConfigRecord]
@ -81,6 +86,12 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
}
})
val record = batches.get(1).get(0).message().asInstanceOf[ConfigRecord]
assertEquals(ConfigResource.Type.BROKER.id(), record.resourceType())
assertEquals("", record.resourceName())
assertEquals(KafkaConfig.DefaultReplicationFactorProp, record.name())
assertEquals("3", record.value())
// Update the sensitive config value from the config client and check that the value
// persisted in Zookeeper is encrypted.
val newProps = new util.HashMap[String, String]()