diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 26907587a4f..1d3320ffc8c 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -197,7 +197,9 @@ class ZkMigrationClient( brokerIdConsumer: Consumer[Integer] ): Unit = wrapZkException { configClient.iterateBrokerConfigs((broker, props) => { - brokerIdConsumer.accept(Integer.valueOf(broker)) + if (broker.nonEmpty) { + brokerIdConsumer.accept(Integer.valueOf(broker)) + } val batch = new util.ArrayList[ApiMessageAndVersion]() props.forEach((key, value) => { batch.add(new ApiMessageAndVersion(new ConfigRecord() diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala index e7fb5f21051..57ff05ee38e 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala @@ -62,10 +62,15 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { props.put(KafkaConfig.SslKeystorePasswordProp, encoder.encode(new Password(SECRET))) // sensitive config zkClient.setOrCreateEntityConfigs(ConfigType.Broker, "1", props) + val defaultProps = new Properties() + defaultProps.put(KafkaConfig.DefaultReplicationFactorProp, "3") // normal config + zkClient.setOrCreateEntityConfigs(ConfigType.Broker, "", defaultProps) + migrationClient.migrateBrokerConfigs(batch => batches.add(batch), brokerId => brokers.add(brokerId)) assertEquals(1, brokers.size()) - assertEquals(1, batches.size()) + assertEquals(2, batches.size()) assertEquals(2, batches.get(0).size) + assertEquals(1, batches.get(1).size) batches.get(0).forEach(record => { val message = record.message().asInstanceOf[ConfigRecord] @@ -81,6 +86,12 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { } }) + val record = batches.get(1).get(0).message().asInstanceOf[ConfigRecord] + assertEquals(ConfigResource.Type.BROKER.id(), record.resourceType()) + assertEquals("", record.resourceName()) + assertEquals(KafkaConfig.DefaultReplicationFactorProp, record.name()) + assertEquals("3", record.value()) + // Update the sensitive config value from the config client and check that the value // persisted in Zookeeper is encrypted. val newProps = new util.HashMap[String, String]()