KAFKA-2641; Upgrade path for ZK authentication

This pull request adds a configuration parameter and a migration tool. It is also based on pull request #303, which should go in first.

Author: flavio junqueira <fpj@apache.org>
Author: Flavio Junqueira <fpj@apache.org>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #313 from fpj/KAFKA-2641
This commit is contained in:
flavio junqueira 2015-10-23 15:11:07 -07:00 committed by Jun Rao
parent 701c46b3a3
commit 21443f214f
9 changed files with 285 additions and 7 deletions

View File

@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@

View File

@ -109,6 +109,11 @@ public class JaasUtils {
throw new KafkaException("Exception while determining if ZooKeeper is secure");
}
}
/*
* Tests fail if we don't reset the login configuration. It is unclear
* what is actually triggering this bug.
*/
Configuration.setConfiguration(null);
return isSecurityEnabled;
}

View File

@ -226,7 +226,7 @@ object AdminUtils extends Logging {
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import org.apache.log4j.Level
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.mutable.Queue
import scala.concurrent._
import scala.concurrent.duration._
/**
* This tool is to be used when making access to ZooKeeper authenticated or
* the other way around, when removing authenticated access. The exact steps
* to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper
* access are the following:
*
* 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to false
* and passing a valid JAAS login file via the system property
* java.security.auth.login.config
* 2- Perform a second rolling upgrade keeping the system property for the login file
* and now setting zookeeper.set.acl to true
* 3- Finally run this tool. There is a script under ./bin. Run
* ./bin/zookeeper-security-migration --help
* to see the configuration parameters. An example of running it is the following:
* ./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
*
* To convert a cluster from secure to unsecure, we need to perform the following
* steps:
* 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each server
* 2- Run this migration tool, setting zookeeper.acl to unsecure
* 3- Perform another rolling upgrade to remove the system property setting the
* login file (java.security.auth.login.config).
*/
object ZkSecurityMigrator extends Logging {
val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of "
+ "znodes as part of the process of setting up ZooKeeper "
+ "authentication.")
def run(args: Array[String]) {
var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val parser = new OptionParser()
val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+ " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config file.").withOptionalArg().ofType(classOf[String])
val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " +
"takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181").
ofType(classOf[String])
val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout.").
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout.").
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
val helpOpt = parser.accepts("help", "Print usage information.")
val options = parser.parse(args : _*)
if (options.has(helpOpt))
CommandLineUtils.printUsageAndDie(parser, usageMessage)
if ((jaasFile == null) && !options.has(jaasFileOpt)) {
val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set either " +
"the system property %s or the option %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "--jaas.file"))
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
if (jaasFile == null) {
jaasFile = options.valueOf(jaasFileOpt)
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
}
if (!JaasUtils.isZkSecurityEnabled(jaasFile)) {
val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
val zkAcl: Boolean = options.valueOf(zkAclOpt) match {
case "secure" =>
info("zookeeper.acl option is secure")
true
case "unsecure" =>
info("zookeeper.acl option is unsecure")
false
case _ =>
CommandLineUtils.printUsageAndDie(parser, usageMessage)
}
val zkUrl = options.valueOf(zkUrlOpt)
val zkSessionTimeout = options.valueOf(zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = options.valueOf(zkConnectionTimeoutOpt).intValue
val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
val migrator = new ZkSecurityMigrator(zkUtils)
migrator.run()
}
def main(args: Array[String]) {
try {
run(args)
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}
class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
private val workQueue = new LinkedBlockingQueue[Runnable]
private val futures = new Queue[Future[String]]
private def setAclsRecursively(path: String) = {
info("Setting ACL for path %s".format(path))
val setPromise = Promise[String]
val childrenPromise = Promise[String]
futures.synchronized {
futures += setPromise.future
futures += childrenPromise.future
}
zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, setPromise)
zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
}
private object GetChildrenCallback extends ChildrenCallback {
def processResult(rc: Int,
path: String,
ctx: Object,
children: java.util.List[String]) {
val zkHandle = zkUtils.zkConnection.getZookeeper
val promise = ctx.asInstanceOf[Promise[String]]
Code.get(rc) match {
case Code.OK =>
// Set ACL for each child
for (child <- children.asScala)
setAclsRecursively(s"$path/$child")
promise success "done"
case Code.CONNECTIONLOSS =>
zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
case Code.NONODE =>
warn("Node is gone, it could be have been legitimately deleted: %s".format(path))
promise success "done"
case Code.SESSIONEXPIRED =>
// Starting a new session isn't really a problem, but it'd complicate
// the logic of the tool, so we quit and let the user re-run it.
System.out.println("ZooKeeper session expired while changing ACLs")
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
case _ =>
System.out.println("Unexpected return code: %d".format(rc))
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
}
}
}
private object SetACLCallback extends StatCallback {
def processResult(rc: Int,
path: String,
ctx: Object,
stat: Stat) {
val zkHandle = zkUtils.zkConnection.getZookeeper
val promise = ctx.asInstanceOf[Promise[String]]
Code.get(rc) match {
case Code.OK =>
info("Successfully set ACLs for %s".format(path))
promise success "done"
case Code.CONNECTIONLOSS =>
zkHandle.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, ctx)
case Code.NONODE =>
warn("Znode is gone, it could be have been legitimately deleted: %s".format(path))
promise success "done"
case Code.SESSIONEXPIRED =>
// Starting a new session isn't really a problem, but it'd complicate
// the logic of the tool, so we quit and let the user re-run it.
System.out.println("ZooKeeper session expired while changing ACLs")
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
case _ =>
System.out.println("Unexpected return code: %d".format(rc))
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
}
}
}
private def run(): Unit = {
try {
for (path <- zkUtils.securePersistentZkPaths) {
debug("Going to set ACL for %s".format(path))
zkUtils.makeSurePersistentPathExists(path)
setAclsRecursively(path)
}
@tailrec
def recurse(): Unit = {
val future = futures.synchronized {
futures.headOption
}
future match {
case Some(a) =>
Await.result(a, 6000 millis)
futures.synchronized { futures.dequeue }
recurse
case None =>
}
}
recurse()
} finally {
zkUtils.close
}
}
}

View File

@ -39,6 +39,7 @@ object Defaults {
/** ********* Zookeeper Configuration ***********/
val ZkSessionTimeoutMs = 6000
val ZkSyncTimeMs = 2000
val ZkEnableSecureAcls = false
/** ********* General Configuration ***********/
val MaxReservedBrokerId = 1000
@ -187,6 +188,7 @@ object KafkaConfig {
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
val ZkEnableSecureAclsProp = "zookeeper.set.acl"
/** ********* General Configuration ***********/
val MaxReservedBrokerIdProp = "reserved.broker.max.id"
val BrokerIdProp = "broker.id"
@ -334,6 +336,7 @@ object KafkaConfig {
val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper"
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
/** ********* General Configuration ***********/
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
val BrokerIdDoc = "The broker id for this server. " +
@ -515,6 +518,7 @@ object KafkaConfig {
.define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc)
.define(ZkConnectionTimeoutMsProp, INT, HIGH, ZkConnectionTimeoutMsDoc, false)
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
.define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
/** ********* General Configuration ***********/
.define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, atLeast(0), MEDIUM, MaxReservedBrokerIdProp)
@ -702,6 +706,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val zkConnectionTimeoutMs: Int =
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
/** ********* General Configuration ***********/
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)

View File

@ -260,12 +260,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
""
}
val secureAclsEnabled = JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) && config.zkEnableSecureAcls
if(config.zkEnableSecureAcls && !secureAclsEnabled) {
throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")
}
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
secureAclsEnabled)
zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.zkClient.close()
@ -274,7 +279,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val zkUtils = ZkUtils(config.zkConnect,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
secureAclsEnabled)
zkUtils.setupCommonPaths()
zkUtils
}

View File

@ -112,7 +112,7 @@ object ZkUtils {
* Get calls that only depend on static paths
*/
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
ZkUtils.BrokerTopicsPath + "/" + topic
}
def getTopicPartitionsPath(topic: String): String = {
@ -126,7 +126,7 @@ object ZkUtils {
getTopicPartitionPath(topic, partitionId) + "/" + "state"
def getEntityConfigRootPath(entityType: String): String =
EntityConfigPath + "/" + entityType
ZkUtils.EntityConfigPath + "/" + entityType
def getEntityConfigPath(entityType: String, entity: String): String =
getEntityConfigRootPath(entityType) + "/" + entity
@ -149,6 +149,15 @@ class ZkUtils(val zkClient: ZkClient,
BrokerSequenceIdPath,
IsrChangeNotificationPath)
val securePersistentZkPaths = Seq(BrokerIdsPath,
BrokerTopicsPath,
EntityConfigChangesPath,
getEntityConfigRootPath(ConfigType.Topic),
getEntityConfigRootPath(ConfigType.Client),
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath)
val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
def getController(): Int = {
@ -713,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient,
def deletePartition(brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
zkClient.delete(brokerIdPath)
val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
zkClient.delete(brokerPartTopicPath)
}

View File

@ -31,7 +31,6 @@ object DeleteZKPath {
val config = new ConsumerConfig(Utils.loadProps(args(0)))
val zkPath = args(1)
val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {

View File

@ -398,6 +398,7 @@ class KafkaConfigTest {
case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")