KAFKA-6672; ConfigCommand should create config change parent path if needed (#4727)

Change `KafkaZkClient.createConfigChangeNotification` to ensure creation of the change directory. This fixes failing system tests which depend on setting SCRAM credentials prior to broker startup. Existing test case has been modified for new expected usage.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2018-03-16 22:29:22 -07:00 committed by GitHub
parent a70e4f95d7
commit 9782465d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 8 deletions

View File

@ -80,12 +80,12 @@ object ConfigCommand extends Config {
}
} catch {
case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: OptionException) =>
logger.debug(s"Failed config command with args $args", e)
logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e)
System.err.println(e.getMessage)
Exit.exit(1)
case t: Throwable =>
System.err.println(s"Error while executing config command with args $args")
System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'")
t.printStackTrace(System.err)
Exit.exit(1)
}

View File

@ -286,10 +286,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* @throws KeeperException if there is an error while setting or creating the znode
*/
def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
val path = ConfigEntityChangeNotificationSequenceZNode.createPath
val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.maybeThrow
createResponse.maybeThrow()
}
/**

View File

@ -562,16 +562,19 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testCreateConfigChangeNotification(): Unit = {
intercept[NoNodeException] {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
}
assertFalse(zkClient.pathExists(ConfigEntityChangeNotificationZNode.path))
zkClient.createTopLevelPaths()
// The parent path is created if needed
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
assertPathExistenceAndData(
"/config/changes/config_change_0000000000",
"""{"version":2,"entity_path":"/config/topics/topic1"}""")
// Creation does not fail if the parent path exists
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic2))
assertPathExistenceAndData(
"/config/changes/config_change_0000000001",
"""{"version":2,"entity_path":"/config/topics/topic2"}""")
}
private def createLogProps(bytesProp: Int): Properties = {