KAFKA-6805: Enable broker configs to be stored in ZK before broker start (#4898)

Support configuration of dynamic broker configs in ZooKeeper before starting brokers using ConfigCommand. This will allow password configs to be encrypted and stored in ZooKeeper, without requiring clear passwords in server.properties to bootstrap the broker first.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Rajini Sivaram 2018-06-18 18:28:08 +01:00 committed by GitHub
parent 1546bcd877
commit d504e85011
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 295 additions and 82 deletions

View File

@ -24,13 +24,14 @@ import joptsimple._
import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils.{CommandLineUtils, Exit}
import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.{CommandLineUtils, Exit, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@ -56,11 +57,14 @@ import scala.collection.JavaConverters._
object ConfigCommand extends Config {
val DefaultScramIterations = 4096
// Dynamic broker configs can only be updated using the new AdminClient since they may require
// password encryption currently implemented only in the broker. For consistency with older versions,
// quota-related broker configs can still be updated using ZooKeeper. ConfigCommand will be migrated
// fully to the new AdminClient later (KIP-248).
val BrokerConfigsUpdatableUsingZooKeeper = Set(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
// Dynamic broker configs can only be updated using the new AdminClient once brokers have started
// so that configs may be fully validated. Prior to starting brokers, updates may be performed using
// ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted in ZK,
// avoiding clear passwords in server.properties. For consistency with older versions, quota-related
// broker configs can still be updated using ZooKeeper at any time. ConfigCommand will be migrated
// to the new AdminClient later for these configs (KIP-248).
val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set(
DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
@ -114,9 +118,25 @@ object ConfigCommand extends Config {
if (entityType == ConfigType.User)
preProcessScramCredentials(configsToBeAdded)
if (entityType == ConfigType.Broker) {
require(configsToBeAdded.asScala.keySet.forall(BrokerConfigsUpdatableUsingZooKeeper.contains),
s"--bootstrap-server option must be specified to update broker configs $configsToBeAdded")
else if (entityType == ConfigType.Broker) {
// Replication quota configs may be updated using ZK at any time. Other dynamic broker configs
// may be updated using ZooKeeper only if the corresponding broker is not running. Dynamic broker
// configs at cluster-default level may be configured using ZK only if there are no brokers running.
val dynamicBrokerConfigs = configsToBeAdded.asScala.keySet.filterNot(BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning.contains)
if (dynamicBrokerConfigs.nonEmpty) {
val perBrokerConfig = entityName != ConfigEntityName.Default
val errorMessage = s"--bootstrap-server option must be specified to update broker configs $dynamicBrokerConfigs."
val info = "Broker configuraton updates using ZooKeeper are supported for bootstrapping before brokers" +
" are started to enable encrypted password configs to be stored in ZooKeeper."
if (perBrokerConfig) {
adminZkClient.parseBroker(entityName).foreach { brokerId =>
require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage when broker $entityName is running. $info")
}
} else {
require(zkClient.getAllBrokersInCluster.isEmpty, s"$errorMessage for default cluster if any broker is running. $info")
}
preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig)
}
}
// compile the final set of configs
@ -156,6 +176,49 @@ object ConfigCommand extends Config {
}
}
private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
throw new IllegalArgumentException("Password encoder secret not specified"))
new PasswordEncoder(new Password(encoderSecret),
None,
encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderIterations))
}
/**
* Pre-process broker configs provided to convert them to persistent format.
* Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`.
* The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper.
*/
private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean) {
val passwordEncoderConfigs = new Properties
passwordEncoderConfigs ++= configsToBeAdded.asScala.filterKeys(_.startsWith("password.encoder."))
if (!passwordEncoderConfigs.isEmpty) {
info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be used for encrypting" +
" passwords, but will not be stored in ZooKeeper.")
passwordEncoderConfigs.asScala.keySet.foreach(configsToBeAdded.remove)
}
DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
if (passwordConfigs.nonEmpty) {
require(passwordEncoderConfigs.containsKey(KafkaConfig.PasswordEncoderSecretProp),
s"${KafkaConfig.PasswordEncoderSecretProp} must be specified to update $passwordConfigs." +
" Other password encoder configs like cipher algorithm and iterations may also be specified" +
" to override the default encoding parameters. Password encoder configs will not be persisted" +
" in ZooKeeper."
)
val passwordEncoder = createPasswordEncoder(passwordEncoderConfigs.asScala)
passwordConfigs.foreach { configName =>
val encodedValue = passwordEncoder.encode(new Password(configsToBeAdded.getProperty(configName)))
configsToBeAdded.setProperty(configName, encodedValue)
}
}
}
private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
@ -358,7 +421,12 @@ object ConfigCommand extends Config {
parseQuotaEntity(opts)
else {
// Exactly one entity type and at-most one entity name expected for other entities
val name = if (opts.options.has(opts.entityName)) Some(opts.options.valueOf(opts.entityName)) else None
val name = if (opts.options.has(opts.entityName))
Some(opts.options.valueOf(opts.entityName))
else if (entityTypes.head == ConfigType.Broker && opts.options.has(opts.entityDefault))
Some(ConfigEntityName.Default)
else
None
ConfigEntity(Entity(entityTypes.head, name), None)
}
}

View File

@ -115,6 +115,43 @@ object DynamicBrokerConfig {
}
}
def validateConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
if (invalidPropNames.nonEmpty)
throw new ConfigException(s"$errorMessage: $invalidPropNames")
}
checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
"These security configs can be dynamically updated only per-listener using the listener prefix")
validateConfigTypes(props)
if (!perBrokerConfig) {
checkInvalidProps(perBrokerConfigs(props),
"Cannot update these configs at default cluster level, broker id must be specified")
}
}
private def perBrokerConfigs(props: Properties): Set[String] = {
val configNames = props.asScala.keySet
configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
}
private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
}
private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
DynamicSecurityConfigs.filter(props.containsKey)
}
private def validateConfigTypes(props: Properties): Unit = {
val baseProps = new Properties
props.asScala.foreach {
case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
case (k, v) => baseProps.put(k, v)
}
DynamicConfig.Broker.validate(baseProps)
}
private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config =>
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
@ -298,57 +335,26 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) }
}
}
adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props)
adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
}
}
props
}
private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
if (invalidPropNames.nonEmpty)
throw new ConfigException(s"$errorMessage: $invalidPropNames")
}
checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
"These security configs can be dynamically updated only per-listener using the listener prefix")
validateConfigTypes(props)
validateConfigs(props, perBrokerConfig)
val newProps = mutable.Map[String, String]()
newProps ++= staticBrokerConfigs
if (perBrokerConfig) {
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, props.asScala)
} else {
checkInvalidProps(perBrokerConfigs(props),
"Cannot update these configs at default cluster level, broker id must be specified")
overrideProps(newProps, props.asScala)
overrideProps(newProps, dynamicBrokerConfigs)
}
processReconfiguration(newProps, validateOnly = true)
}
private def perBrokerConfigs(props: Properties): Set[String] = {
val configNames = props.asScala.keySet
configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
}
private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
}
private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
DynamicSecurityConfigs.filter(props.containsKey)
}
private def validateConfigTypes(props: Properties): Unit = {
val baseProps = new Properties
props.asScala.foreach {
case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
case (k, v) => baseProps.put(k, v)
}
DynamicConfig.Broker.validate(baseProps)
}
private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
try {
validateConfigTypes(props)

View File

@ -265,6 +265,18 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
}
}
def parseBroker(broker: String): Option[Int] = {
broker match {
case ConfigEntityName.Default => None
case _ =>
try Some(broker.toInt)
catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
}
}
}
/**
* Change the configs for a given entityType and entityName
* @param entityType
@ -273,19 +285,11 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
*/
def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit = {
def parseBroker(broker: String): Int = {
try broker.toInt
catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
}
}
entityType match {
case ConfigType.Topic => changeTopicConfig(entityName, configs)
case ConfigType.Client => changeClientIdConfig(entityName, configs)
case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs)
case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs)
case ConfigType.Broker => changeBrokerConfig(parseBroker(entityName), configs)
case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
}
}

View File

@ -126,6 +126,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
val kafkaConfig = KafkaConfig.fromProps(props)
configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
servers += TestUtils.createServer(kafkaConfig)
}
@ -778,21 +779,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
val config = server.config
val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
val oldSecret = "old-dynamic-config-secret"
config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp, oldSecret)
val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig)
assertTrue("Password configs not found", passwordConfigs.nonEmpty)
val passwordDecoder = new PasswordEncoder(secret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
val passwordEncoder = new PasswordEncoder(new Password(oldSecret),
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret)
val passwordEncoder = createPasswordEncoder(config, Some(new Password(oldSecret)))
passwordConfigs.foreach { case (name, value) =>
val decoded = passwordDecoder.decode(value).value
propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded)))
@ -1161,12 +1153,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private def listenerPrefix(name: String): String = new ListenerName(name).configPrefix
private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = {
private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, sslProperties: Properties): Unit = {
val externalListenerPrefix = listenerPrefix(SecureExternal)
val sslStoreProps = new Properties
sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(sslStoreProps, perBrokerConfig = true)
sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix)
sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
adminZkClient.changeBrokerConfig(brokers, persistentProps)
val args = Array("--zookeeper", kafkaConfig.zkConnect,
"--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
"--entity-type", "brokers",
"--entity-name", kafkaConfig.brokerId.toString)
ConfigCommand.main(args)
val passwordEncoder = createPasswordEncoder(kafkaConfig, kafkaConfig.passwordEncoderSecret)
val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)
assertEquals(4, brokerProps.size)
assertEquals(sslProperties.get(SSL_KEYSTORE_TYPE_CONFIG),
brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_TYPE_CONFIG"))
assertEquals(sslProperties.get(SSL_KEYSTORE_LOCATION_CONFIG),
brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_LOCATION_CONFIG"))
assertEquals(sslProperties.get(SSL_KEYSTORE_PASSWORD_CONFIG),
passwordEncoder.decode(brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_PASSWORD_CONFIG")))
assertEquals(sslProperties.get(SSL_KEY_PASSWORD_CONFIG),
passwordEncoder.decode(brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEY_PASSWORD_CONFIG")))
}
private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = {
val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
new PasswordEncoder(encoderSecret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
}
private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {

View File

@ -20,21 +20,25 @@ import java.util
import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.api.ApiVersion
import kafka.cluster.{Broker, EndPoint}
import kafka.common.InvalidConfigException
import kafka.server.ConfigEntityName
import kafka.server.{ConfigEntityName, KafkaConfig}
import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.Node
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable
import scala.collection.{Seq, mutable}
import scala.collection.JavaConverters._
class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@ -51,7 +55,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-name", "1",
"--entity-type", "brokers",
"--alter",
"--add-config", "message.max.size=100000"))
"--add-config", "security.inter.broker.protocol=PLAINTEXT"))
}
@Test
@ -306,14 +310,99 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
def shouldNotUpdateDynamicBrokerConfigUsingZooKeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "1",
"--entity-type", "brokers",
"--alter",
"--add-config", "message.max.size=100000"))
ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
@Test
def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = {
val brokerId = "1"
val adminZkClient = new AdminZkClient(zkClient)
val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter")
def entityOpt(brokerId: Option[String]): Array[String] = {
brokerId.map(id => Array("--entity-name", id)).getOrElse(Array("--entity-default"))
}
def alterConfig(configs: Map[String, String], brokerId: Option[String],
encoderConfigs: Map[String, String] = Map.empty): Unit = {
val configStr = (configs ++ encoderConfigs).map { case (k, v) => s"$k=$v" }.mkString(",")
val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) ++ Array("--add-config", configStr))
ConfigCommand.alterConfig(zkClient, addOpts, adminZkClient)
}
def verifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit = {
val entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.getOrElse(ConfigEntityName.Default))
assertEquals(configs, entityConfigs.asScala)
}
def alterAndVerifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit = {
alterConfig(configs, brokerId)
verifyConfig(configs, brokerId)
}
def deleteAndVerifyConfig(configNames: Set[String], brokerId: Option[String]): Unit = {
val deleteOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) ++
Array("--delete-config", configNames.mkString(",")))
ConfigCommand.alterConfig(zkClient, deleteOpts, adminZkClient)
verifyConfig(Map.empty, brokerId)
}
// Add config
alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
// Change config
alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId))
alterAndVerifyConfig(Map("message.max.size" -> "140000"), None)
// Delete config
deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId))
deleteAndVerifyConfig(Set("message.max.size"), None)
// Listener configs: should work only with listener name
alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" -> "/tmp/test.jks"), Some(brokerId))
intercept[ConfigException](alterConfig(Map("ssl.keystore.location" -> "/tmp/test.jks"), Some(brokerId)))
// Per-broker config configured at default cluster-level should fail
intercept[ConfigException](alterConfig(Map("listener.name.external.ssl.keystore.location" -> "/tmp/test.jks"), None))
deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"), Some(brokerId))
// Password config update without encoder secret should fail
intercept[IllegalArgumentException](alterConfig(Map("listener.name.external.ssl.keystore.password" -> "secret"), Some(brokerId)))
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2")
val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret")
alterConfig(configs, Some(brokerId), encoderConfigs)
val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
assertFalse("Encoder secret stored in ZooKeeper", brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp))
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded
val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
assertEquals("secret", passwordEncoder.decode(encodedPassword).value)
assertEquals(configs.size, brokerConfigs.size)
// Password config update with overrides for encoder parameters
val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2")
val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret",
KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
KafkaConfig.PasswordEncoderIterationsProp -> "1024",
KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1",
KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
alterConfig(configs2, Some(brokerId), encoderConfigs2)
val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value)
assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value)
// Password config update at default cluster-level should fail
intercept[ConfigException](alterConfig(configs, None, encoderConfigs))
// Dynamic config updates using ZK should fail if broker is running.
registerBrokerInZk(brokerId.toInt)
intercept[IllegalArgumentException](alterConfig(Map("message.max.size" -> "210000"), Some(brokerId)))
intercept[IllegalArgumentException](alterConfig(Map("message.max.size" -> "220000"), None))
// Dynamic config updates using ZK should for a different broker that is not running should succeed
alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
}
@Test (expected = classOf[IllegalArgumentException])
@ -322,7 +411,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-name", "1",
"--entity-type", "brokers",
"--alter",
"--add-config", "a="))
"--add-config", "a=="))
ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@ -593,6 +682,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
}
private def registerBrokerInZk(id: Int): Unit = {
zkClient.createTopLevelPaths()
val securityProtocol = SecurityProtocol.PLAINTEXT
val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), ApiVersion.latestVersion, jmxPort = 9192)
zkClient.registerBrokerInZk(brokerInfo)
}
class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties}

View File

@ -90,6 +90,23 @@
using <code>kafka-configs.sh</code> even if the password config is not being altered. This constraint will be removed in
a future release.</p>
<h5>Updating Password Configs in ZooKeeper Before Starting Brokers</h5>
From Kafka 2.0.0 onwards, <code>kafka-configs.sh</code> enables dynamic broker configs to be updated using ZooKeeper before
starting brokers for bootstrapping. This enables all password configs to be stored in encrypted form, avoiding the need for
clear passwords in <code>server.properties</code>. The broker config <code>password.encoder.secret</code> must also be specified
if any password configs are included in the alter command. Additional encryption parameters may also be specified. Password
encoder configs will not be persisted in ZooKeeper. For example, to store SSL key password for listener <code>INTERNAL</code>
on broker 0:
<pre class="brush: bash;">
&gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config
'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'
</pre>
The configuration <code>listener.name.internal.ssl.key.password</code> will be persisted in ZooKeeper in encrypted
form using the provided encoder configs. The encoder secret and iterations are not persisted in ZooKeeper.
<h5>Updating SSL Keystore of an Existing Listener</h5>
Brokers may be configured with SSL keystores with short validity periods to reduce the risk of compromised certificates.
Keystores may be updated dynamically without restarting the broker. The config name must be prefixed with the listener prefix

View File

@ -127,6 +127,8 @@
<p>KIP-283 also adds new topic and broker configurations <code>message.downconversion.enable</code> and <code>log.message.downconversion.enable</code> respectively
to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an <code>UNSUPPORTED_VERSION</code>
error to the client.</p></li>
<li>Dynamic broker configuration options can be stored in ZooKeeper using kafka-configs.sh before brokers are started.
This option can be used to avoid storing clear passwords in server.properties as all password configs may be stored encrypted in ZooKeeper.</li>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>