From 9782465d6f28dda7fd7f156743f4278e6b62364b Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 16 Mar 2018 22:29:22 -0700 Subject: [PATCH] KAFKA-6672; ConfigCommand should create config change parent path if needed (#4727) Change `KafkaZkClient.createConfigChangeNotification` to ensure creation of the change directory. This fixes failing system tests which depend on setting SCRAM credentials prior to broker startup. Existing test case has been modified for new expected usage. Reviewers: Ismael Juma --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 4 ++-- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 ++- .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 13 ++++++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 044be6a5ba2..3563448feb6 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -80,12 +80,12 @@ object ConfigCommand extends Config { } } catch { case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: OptionException) => - logger.debug(s"Failed config command with args $args", e) + logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e) System.err.println(e.getMessage) Exit.exit(1) case t: Throwable => - System.err.println(s"Error while executing config command with args $args") + System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'") t.printStackTrace(System.err) Exit.exit(1) } diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index d61b281eed1..0a2d96a796a 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -286,10 +286,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @throws KeeperException if there is an error while setting or creating the znode */ def createConfigChangeNotification(sanitizedEntityPath: String): Unit = { + makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) val path = ConfigEntityChangeNotificationSequenceZNode.createPath val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) - createResponse.maybeThrow + createResponse.maybeThrow() } /** diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index e44c2c94e52..a6c095688f3 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -562,16 +562,19 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testCreateConfigChangeNotification(): Unit = { - intercept[NoNodeException] { - zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1)) - } + assertFalse(zkClient.pathExists(ConfigEntityChangeNotificationZNode.path)) - zkClient.createTopLevelPaths() + // The parent path is created if needed zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1)) - assertPathExistenceAndData( "/config/changes/config_change_0000000000", """{"version":2,"entity_path":"/config/topics/topic1"}""") + + // Creation does not fail if the parent path exists + zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic2)) + assertPathExistenceAndData( + "/config/changes/config_change_0000000001", + """{"version":2,"entity_path":"/config/topics/topic2"}""") } private def createLogProps(bytesProp: Int): Properties = {