From a56a79055dfba4687f476b0a4d20aeec1c4ebff7 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 4 Aug 2015 15:11:27 -0700 Subject: [PATCH] kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao --- bin/kafka-configs.sh | 17 ++ .../main/scala/kafka/admin/AdminUtils.scala | 83 +++++---- .../scala/kafka/admin/ConfigCommand.scala | 174 ++++++++++++++++++ .../main/scala/kafka/admin/TopicCommand.scala | 39 +--- .../main/scala/kafka/cluster/Partition.scala | 5 +- .../kafka/controller/KafkaController.scala | 7 +- .../controller/PartitionLeaderSelector.scala | 6 +- .../controller/TopicDeletionManager.scala | 5 +- .../scala/kafka/server/ConfigHandler.scala | 69 +++++++ ...nager.scala => DynamicConfigManager.scala} | 147 +++++++++------ .../main/scala/kafka/server/KafkaServer.scala | 15 +- .../kafka/server/ReplicaFetcherThread.scala | 4 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 33 +++- .../scala/unit/kafka/admin/AdminTest.scala | 8 +- .../unit/kafka/admin/ConfigCommandTest.scala | 73 ++++++++ .../unit/kafka/admin/TopicCommandTest.scala | 13 +- .../server/DynamicConfigChangeTest.scala | 83 ++++++++- 17 files changed, 627 insertions(+), 154 deletions(-) create mode 100755 bin/kafka-configs.sh create mode 100644 core/src/main/scala/kafka/admin/ConfigCommand.scala create mode 100644 core/src/main/scala/kafka/server/ConfigHandler.scala rename core/src/main/scala/kafka/server/{TopicConfigManager.scala => DynamicConfigManager.scala} (51%) create mode 100644 core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala diff --git a/bin/kafka-configs.sh b/bin/kafka-configs.sh new file mode 100755 index 00000000000..417eaf57e55 --- /dev/null +++ b/bin/kafka-configs.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@ diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 4cc237696c0..9966660cf66 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -21,6 +21,7 @@ import kafka.common._ import kafka.cluster.{BrokerEndPoint, Broker} import kafka.log.LogConfig +import kafka.server.ConfigType import kafka.utils._ import kafka.api.{TopicMetadata, PartitionMetadata} @@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException object AdminUtils extends Logging { val rand = new Random - val AdminClientId = "__admin_client" - - val TopicConfigChangeZnodePrefix = "config_change_" + val EntityConfigChangeZnodePrefix = "config_change_" /** * There are 2 goals of replica assignment: @@ -103,14 +102,12 @@ object AdminUtils extends Logging { * @param numPartitions Number of partitions to be set * @param replicaAssignmentStr Manual replica assignment * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing - * @param config Pre-existing properties that should be preserved */ def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", - checkBrokerAvailable: Boolean = true, - config: Properties = new Properties) { + checkBrokerAvailable: Boolean = true) { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) @@ -137,7 +134,7 @@ object AdminUtils extends Logging { val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list partitionReplicaList ++= newPartitionReplicaList - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = { @@ -238,7 +235,7 @@ object AdminUtils extends Logging { val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig) } - + def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient, topic: String, partitionReplicaAssignment: Map[Int, Seq[Int]], @@ -246,7 +243,6 @@ object AdminUtils extends Logging { update: Boolean = false) { // validate arguments Topic.validate(topic) - LogConfig.validate(config) require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") val topicPath = ZkUtils.getTopicPath(topic) @@ -264,10 +260,14 @@ object AdminUtils extends Logging { } partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) - - // write out the config if there is any, this isn't transactional with the partition assignments - writeTopicConfig(zkClient, topic, config) - + + // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported + if (!update) { + // write out the config if there is any, this isn't transactional with the partition assignments + LogConfig.validate(config) + writeEntityConfig(zkClient, ConfigType.Topic, topic, config) + } + // create the partition assignment writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update) } @@ -290,7 +290,19 @@ object AdminUtils extends Logging { case e2: Throwable => throw new AdminOperationException(e2.toString) } } - + + /** + * Update the config for a client and create a change notification so the change will propagate to other brokers + * @param zkClient: The ZkClient handle used to write the new config to zookeeper + * @param clientId: The clientId for which configs are being changed + * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or + * existing configs need to be deleted, it should be done prior to invoking this API + * + */ + def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) { + changeEntityConfig(zkClient, ConfigType.Client, clientId, configs) + } + /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers * @param zkClient: The ZkClient handle used to write the new config to zookeeper @@ -302,34 +314,42 @@ object AdminUtils extends Logging { def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) { if(!topicExists(zkClient, topic)) throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - // remove the topic overrides LogConfig.validate(configs) - - // write the new config--may not exist if there were previously no overrides - writeTopicConfig(zkClient, topic, configs) - - // create the change notification - zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic)) + changeEntityConfig(zkClient, ConfigType.Topic, topic, configs) } - + + private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) { + // write the new config--may not exist if there were previously no overrides + writeEntityConfig(zkClient, entityType, entityName, configs) + + // create the change notification + val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix + val content = Json.encode(getConfigChangeZnodeData(entityType, entityName)) + zkClient.createPersistentSequential(seqNode, content) + } + + def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = { + Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName) + } + /** * Write out the topic config to zk, if there is any */ - private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { + private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) { val configMap: mutable.Map[String, String] = { import JavaConversions._ config } val map = Map("version" -> 1, "config" -> configMap) - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map)) } /** - * Read the topic config (if any) from zk + * Read the entity (topic or client) config (if any) from zk */ - def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = { - val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true) + def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = { + val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true) val props = new Properties() if(str != null) { Json.parseFull(str) match { @@ -343,19 +363,20 @@ object AdminUtils extends Logging { configTup match { case (k: String, v: String) => props.setProperty(k, v) - case _ => throw new IllegalArgumentException("Invalid topic config: " + str) + case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) } - case _ => throw new IllegalArgumentException("Invalid topic config: " + str) + case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) } - case o => throw new IllegalArgumentException("Unexpected value in config: " + str) + case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)" + .format(str, entityType, entity)) } } props } def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] = - ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap + ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topic, topic))).toMap def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala new file mode 100644 index 00000000000..27594765882 --- /dev/null +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple._ +import java.util.Properties +import kafka.log.LogConfig +import kafka.server.ConfigType +import kafka.utils.{ZkUtils, CommandLineUtils} +import org.I0Itec.zkclient.ZkClient +import scala.collection._ +import scala.collection.JavaConversions._ +import org.apache.kafka.common.utils.Utils + + +/** + * This script can be used to change configs for topics/clients dynamically + */ +object ConfigCommand { + + def main(args: Array[String]): Unit = { + + val opts = new ConfigCommandOptions(args) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topic/client) configs") + + opts.checkArgs() + + val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000) + + try { + if (opts.options.has(opts.alterOpt)) + alterConfig(zkClient, opts) + else if (opts.options.has(opts.describeOpt)) + describeConfig(zkClient, opts) + } catch { + case e: Throwable => + println("Error while executing topic command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + zkClient.close() + } + } + + private def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) { + val configsToBeAdded = parseConfigsToBeAdded(opts) + val configsToBeDeleted = parseConfigsToBeDeleted(opts) + val entityType = opts.options.valueOf(opts.entityType) + val entityName = opts.options.valueOf(opts.entityName) + + // compile the final set of configs + val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName) + configs.putAll(configsToBeAdded) + configsToBeDeleted.foreach(config => configs.remove(config)) + + if (entityType.equals(ConfigType.Topic)) { + AdminUtils.changeTopicConfig(zkClient, entityName, configs) + println("Updated config for topic: \"%s\".".format(entityName)) + } else { + AdminUtils.changeClientIdConfig(zkClient, entityName, configs) + println("Updated config for clientId: \"%s\".".format(entityName)) + } + } + + private def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) { + val entityType = opts.options.valueOf(opts.entityType) + val entityNames: Seq[String] = + if (opts.options.has(opts.entityName)) + Seq(opts.options.valueOf(opts.entityName)) + else + ZkUtils.getAllEntitiesWithConfig(zkClient, entityType) + + for (entityName <- entityNames) { + val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName) + println("Configs for %s:%s are %s" + .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + } + } + + private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.addedConfig).map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid entity config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props + } + + private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = { + if (opts.options.has(opts.deletedConfig)) { + val configsToBeDeleted = opts.options.valuesOf(opts.deletedConfig).map(_.trim()) + val propsToBeDeleted = new Properties + configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) + configsToBeDeleted + } + else + Seq.empty + } + + class ConfigCommandOptions(args: Array[String]) { + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") + val describeOpt = parser.accepts("describe", "List configs for the given entity.") + val entityType = parser.accepts("entity-type", "Type of entity (topic/client)") + .withRequiredArg + .ofType(classOf[String]) + val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)") + .withRequiredArg + .ofType(classOf[String]) + + val nl = System.getProperty("line.separator") + val addedConfig = parser.accepts("added-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " + + "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl + + "For entity_type '" + ConfigType.Client + "' currently no configs are processed by the brokers") + .withRequiredArg + .ofType(classOf[String]) + .withValuesSeparatedBy(',') + val deletedConfig = parser.accepts("deleted-config", "config keys to remove 'k1,k2'") + .withRequiredArg + .ofType(classOf[String]) + .withValuesSeparatedBy(',') + val helpOpt = parser.accepts("help", "Print usage information.") + val options = parser.parse(args : _*) + + val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addedConfig, deletedConfig, helpOpt) + + def checkArgs() { + // should have exactly one action + val actions = Seq(alterOpt, describeOpt).count(options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter") + + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType) + CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addedConfig, deletedConfig)) + if(options.has(alterOpt)) { + if(! options.has(entityName)) + throw new IllegalArgumentException("--entity-name must be specified with --alter") + + val isAddedPresent: Boolean = options.has(addedConfig) + val isDeletedPresent: Boolean = options.has(deletedConfig) + if(! isAddedPresent && ! isDeletedPresent) + throw new IllegalArgumentException("At least one of --added-config or --deleted-config must be specified with --alter") + } + val entityTypeVal = options.valueOf(entityType) + if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) { + throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client)) + } + } + } + +} diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 4e28bf1c084..f1405a5b296 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -20,6 +20,7 @@ package kafka.admin import joptsimple._ import java.util.Properties import kafka.common.{Topic, AdminCommandFailedException} +import kafka.utils.CommandLineUtils import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -27,6 +28,7 @@ import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig import kafka.consumer.Whitelist +import kafka.server.{ConfigType, OffsetManager} import org.apache.kafka.common.utils.Utils import kafka.coordinator.ConsumerCoordinator @@ -106,16 +108,6 @@ object TopicCommand extends Logging { opts.options.valueOf(opts.zkConnectOpt))) } topics.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { - val configsToBeAdded = parseTopicConfigsToBeAdded(opts) - val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) - // compile the final set of configs - configs.putAll(configsToBeAdded) - configsToBeDeleted.foreach(config => configs.remove(config)) - AdminUtils.changeTopicConfig(zkClient, topic, configs) - println("Updated config for topic \"%s\".".format(topic)) - } if(opts.options.has(opts.partitionsOpt)) { if (topic == ConsumerCoordinator.OffsetsTopicName) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") @@ -124,7 +116,7 @@ object TopicCommand extends Logging { "logic or ordering of the messages will be affected") val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) - AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs) + AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) println("Adding partitions succeeded!") } } @@ -180,7 +172,7 @@ object TopicCommand extends Logging { val describePartitions: Boolean = !reportOverriddenConfigs val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) if (describeConfigs) { - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) if (!reportOverriddenConfigs || configs.size() != 0) { val numPartitions = topicPartitionAssignment.size val replicationFactor = topicPartitionAssignment.head._2.size @@ -219,18 +211,6 @@ object TopicCommand extends Logging { props } - def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = { - if (opts.options.has(opts.deleteConfigOpt)) { - val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim()) - val propsToBeDeleted = new Properties - configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) - LogConfig.validateNames(propsToBeDeleted) - configsToBeDeleted - } - else - Seq.empty - } - def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() @@ -256,7 +236,7 @@ object TopicCommand extends Logging { val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") val deleteOpt = parser.accepts("delete", "Delete a topic") - val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") + val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " + @@ -265,16 +245,12 @@ object TopicCommand extends Logging { .describedAs("topic") .ofType(classOf[String]) val nl = System.getProperty("line.separator") - val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." + + val configOpt = parser.accepts("config", "A configuration override for the topic being created." + "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl + "See the Kafka documentation for full details on the topic configs.") .withRequiredArg .describedAs("name=value") .ofType(classOf[String]) - val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " + "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected") .withRequiredArg @@ -308,10 +284,11 @@ object TopicCommand extends Logging { // check invalid args CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt)) CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt)) + // Topic configs cannot be changed with alterTopic + CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt)) if(options.has(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt)) CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2649090b6cb..511d3c929d0 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager} +import kafka.server._ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -86,7 +86,8 @@ class Partition(val topic: String, case Some(replica) => replica case None => if (isReplicaLocal(replicaId)) { - val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic)) + val config = LogConfig.fromProps(logManager.defaultConfig.originals, + AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b4fc755641b..684460286c8 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1037,8 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can // eventually be restored as the leader. - if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { + if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient, + ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) newIsr = leaderAndIsr.isr } @@ -1322,7 +1322,8 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil processUpdateNotifications(topicAndPartitions) // delete processed children - childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x)) + childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, + ZkUtils.getEntityConfigPath(ConfigType.Topic, x))) } } diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index bb6b5c87645..4ebeb5a0ef3 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr import kafka.log.LogConfig import kafka.utils.Logging import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.server.KafkaConfig +import kafka.server.{ConfigType, KafkaConfig} trait PartitionLeaderSelector { @@ -61,8 +61,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi case true => // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. - if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { + if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient, + ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException(("No broker in ISR for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 64ecb499f24..64b11df5e00 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -16,6 +16,9 @@ */ package kafka.controller + +import kafka.server.ConfigType + import collection.mutable import kafka.utils.{ShutdownableThread, Logging, ZkUtils} import kafka.utils.CoreUtils._ @@ -284,7 +287,7 @@ class TopicDeletionManager(controller: KafkaController, topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) - controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) + controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) controllerContext.removeTopic(topic) } diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala new file mode 100644 index 00000000000..8347a69a34c --- /dev/null +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.Properties + +import kafka.common.TopicAndPartition +import kafka.log.{Log, LogConfig, LogManager} +import kafka.utils.Pool + +import scala.collection.mutable + +/** + * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager + */ +trait ConfigHandler { + def processConfigChanges(entityName : String, value : Properties) +} + +/** + * The TopicConfigHandler will process topic config changes in ZK. + * The callback provides the topic name and the full properties set read from ZK + */ +class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{ + + def processConfigChanges(topic : String, topicConfig : Properties) { + val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer + val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic } + .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) } + + if (logsByTopic.contains(topic)) { + /* combine the default properties with the overrides in zk to create the new LogConfig */ + val props = new Properties() + props.putAll(logManager.defaultConfig.originals) + props.putAll(topicConfig) + val logConfig = LogConfig(props) + for (log <- logsByTopic(topic)) + log.config = logConfig + } + } +} + +/** + * The ClientIdConfigHandler will process clientId config changes in ZK. + * The callback provides the clientId and the full properties set read from ZK. + * This implementation does nothing currently. In the future, it will change quotas per client + */ +class ClientIdConfigHandler extends ConfigHandler { + val configPool = new Pool[String, Properties]() + + def processConfigChanges(clientId : String, clientConfig : Properties): Unit = { + configPool.put(clientId, clientConfig) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala similarity index 51% rename from core/src/main/scala/kafka/server/TopicConfigManager.scala rename to core/src/main/scala/kafka/server/DynamicConfigManager.scala index 01b1b0a8efe..a66fb75f280 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -17,67 +17,79 @@ package kafka.server -import java.util.Properties +import kafka.utils.Json +import kafka.utils.Logging +import kafka.utils.SystemTime +import kafka.utils.Time +import kafka.utils.ZkUtils + import scala.collection._ -import kafka.log._ -import kafka.utils._ import kafka.admin.AdminUtils import org.I0Itec.zkclient.{IZkChildListener, ZkClient} + /** - * This class initiates and carries out topic config changes. - * + * Represents all the entities that can be configured via ZK + */ +object ConfigType { + val Topic = "topic" + val Client = "client" +} + +/** + * This class initiates and carries out config changes for all entities defined in ConfigType. + * * It works as follows. - * - * Config is stored under the path - * /config/topics/ - * This znode stores the topic-overrides for this topic (but no defaults) in properties format. - * + * + * Config is stored under the path: /config/entityType/entityName + * E.g. /config/topics/ and /config/clients/ + * This znode stores the overrides for this entity (but no defaults) in properties format. + * * To avoid watching all topics for changes instead we have a notification path * /config/changes - * The TopicConfigManager has a child watch on this path. - * - * To update a topic config we first update the topic config properties. Then we create a new sequential - * znode under the change path which contains the name of the topic that was updated, say + * The DynamicConfigManager has a child watch on this path. + * + * To update a config we first update the config properties. Then we create a new sequential + * znode under the change path which contains the name of the entityType and entityName that was updated, say * /config/changes/config_change_13321 - * This is just a notification--the actual config change is stored only once under the /config/topics/ path. - * + * The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"} + * This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path. + * * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds - * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. - * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config - * for all logs for that topic (if any) that it has. - * + * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. + * For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config. + * * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that - * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the + * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice, * but that is harmless. - * + * * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions * on startup where a change might be missed between the initial config load and registering for change notifications. - * + * */ -class TopicConfigManager(private val zkClient: ZkClient, - private val logManager: LogManager, - private val changeExpirationMs: Long = 15*60*1000, - private val time: Time = SystemTime) extends Logging { +class DynamicConfigManager(private val zkClient: ZkClient, + private val configHandler : Map[String, ConfigHandler], + private val changeExpirationMs: Long = 15*60*1000, + private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L - + /** * Begin watching for config changes */ def startup() { - ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath) - zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath) + zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) processAllConfigChanges() } - + /** * Process all config changes */ private def processAllConfigChanges() { - val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) + val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath) import JavaConversions._ processConfigChanges((configChanges: mutable.Buffer[String]).sorted) } @@ -89,39 +101,59 @@ class TopicConfigManager(private val zkClient: ZkClient, if (notifications.size > 0) { info("Processing config change notification(s)...") val now = time.milliseconds - val logs = logManager.logsByTopicPartition.toBuffer - val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) for (notification <- notifications) { val changeId = changeNumber(notification) + if (changeId > lastExecutedChange) { - val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) - if(jsonOpt.isDefined) { - val json = jsonOpt.get - val topic = json.substring(1, json.length - 1) // hacky way to dequote - if (logsByTopic.contains(topic)) { - /* combine the default properties with the overrides in zk to create the new LogConfig */ - val props = new Properties() - props.putAll(logManager.defaultConfig.originals) - props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) - val logConfig = LogConfig(props) - for (log <- logsByTopic(topic)) - log.config = logConfig - info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) - purgeObsoleteNotifications(now, notifications) - } - } - lastExecutedChange = changeId + processNotification(jsonOpt) } + lastExecutedChange = changeId + } + purgeObsoleteNotifications(now, notifications) + } + } + + def processNotification(jsonOpt: Option[String]) = { + if(jsonOpt.isDefined) { + val json = jsonOpt.get + Json.parseFull(json) match { + case None => // There are no config overrides. + // Ignore non-json notifications because they can be from the deprecated TopicConfigManager + case Some(mapAnon: Map[_, _]) => + val map = mapAnon collect + { case (k: String, v: Any) => k -> v } + require(map("version") == 1) + + val entityType = map.get("entity_type") match { + case Some(ConfigType.Topic) => ConfigType.Topic + case Some(ConfigType.Client) => ConfigType.Client + case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." + + " Received: " + json) + } + + val entity = map.get("entity_name") match { + case Some(value: String) => value + case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json) + } + configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity)) + + case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" + + "{\"version\" : 1," + + " \"entity_type\":\"topic/client\"," + + " \"entity_name\" : \"topic_name/client_id\"}." + + " Received: " + json) } } } - + private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { for(notification <- notifications.sorted) { - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification) + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification) if(jsonOpt.isDefined) { - val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification if (now - stat.getCtime > changeExpirationMs) { debug("Purging config change notification " + notification) ZkUtils.deletePath(zkClient, changeZnode) @@ -131,10 +163,10 @@ class TopicConfigManager(private val zkClient: ZkClient, } } } - + /* get the change number from a change notification znode */ - private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong - + private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong + /** * A listener that applies config changes to logs */ @@ -148,5 +180,4 @@ class TopicConfigManager(private val zkClient: ZkClient, } } } - } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 18917bc4464..84d4730ac63 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -31,8 +31,9 @@ import java.io.File import kafka.utils._ import org.apache.kafka.common.metrics._ import org.apache.kafka.common.network.NetworkReceive +import org.apache.kafka.common.metrics.{JmxReporter, Metrics} -import scala.collection.{JavaConversions, mutable} +import scala.collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} @@ -77,7 +78,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var replicaManager: ReplicaManager = null - var topicConfigManager: TopicConfigManager = null + var dynamicConfigHandlers: Map[String, ConfigHandler] = null + var dynamicConfigManager: DynamicConfigManager = null + val metrics: Metrics = new Metrics() var consumerCoordinator: ConsumerCoordinator = null @@ -171,9 +174,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Mx4jLoader.maybeLoad() - /* start topic config manager */ - topicConfigManager = new TopicConfigManager(zkClient, logManager) - topicConfigManager.startup() + /* start dynamic config manager */ + dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), + ConfigType.Client -> new ClientIdConfigHandler) + dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) + dynamicConfigManager.startup() /* tell everyone we are alive */ val listeners = config.advertisedListeners.map {case(protocol, endpoint) => diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c89d00b5976..fae22d2af8d 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -90,8 +90,8 @@ class ReplicaFetcherThread(name:String, // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { + if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient, + ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 166814c2959..4ae310e08d5 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -19,6 +19,7 @@ package kafka.utils import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} +import kafka.server.ConfigType import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} @@ -39,8 +40,6 @@ object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" - val TopicConfigPath = "/config/topics" - val TopicConfigChangesPath = "/config/changes" val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" val ReassignPartitionsPath = "/admin/reassign_partitions" @@ -48,6 +47,8 @@ object ZkUtils extends Logging { val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" val IsrChangeNotificationPath = "/isr_change_notification" + val EntityConfigPath = "/config" + val EntityConfigChangesPath = "/config/changes" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -57,8 +58,11 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = - TopicConfigPath + "/" + topic + def getEntityConfigRootPath(entityType: String): String = + EntityConfigPath + "/" + entityType + + def getEntityConfigPath(entityType: String, entity: String): String = + getEntityConfigRootPath(entityType) + "/" + entity def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic @@ -93,8 +97,14 @@ object ZkUtils extends Logging { } def setupCommonPaths(zkClient: ZkClient) { - for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath)) + for(path <- Seq(ConsumersPath, + BrokerIdsPath, + BrokerTopicsPath, + EntityConfigChangesPath, + ZkUtils.getEntityConfigRootPath(ConfigType.Topic), + ZkUtils.getEntityConfigRootPath(ConfigType.Client), + DeleteTopicsPath, + BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } @@ -753,6 +763,17 @@ object ZkUtils extends Logging { topics } + /** + * Returns all the entities whose configs have been overridden. + */ + def getAllEntitiesWithConfig(zkClient: ZkClient, entityType: String): Seq[String] = { + val entities = ZkUtils.getChildrenParentMayNotExist(zkClient, getEntityConfigRootPath(entityType)) + if(entities == null) + Seq.empty[String] + else + entities + } + def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) if(topics == null) Set.empty[TopicAndPartition] diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 93f200e3e6d..86dcc4c19eb 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -25,7 +25,7 @@ import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.utils.{Logging, ZkUtils, TestUtils} import kafka.common.{InvalidTopicException, TopicExistsException, TopicAndPartition} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.{ConfigType, KafkaServer, KafkaConfig} import java.io.File import TestUtils._ @@ -407,12 +407,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkConfig(maxMessageSize, retentionMs) // now double the config values for the topic and check that it is applied + val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs) AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) checkConfig(2*maxMessageSize, 2 * retentionMs) + + // Verify that the same config can be read from ZK + val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic) + assertEquals(newConfig, configInZk) } finally { server.shutdown() server.config.logDirs.foreach(CoreUtils.rm(_)) } } - } diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala new file mode 100644 index 00000000000..cfe0ec371f9 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import junit.framework.Assert._ +import kafka.admin.ConfigCommand.ConfigCommandOptions +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import kafka.utils.Logging +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.server.{ConfigType, OffsetManager, KafkaConfig} +import kafka.admin.TopicCommand.TopicCommandOptions +import kafka.utils.ZkUtils + +class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { + @Test + def testArgumentParse() { + // Should parse correctly + var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "x", + "--entity-type", "client", + "--describe")) + createOpts.checkArgs() + + // For --alter and added config + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "x", + "--entity-type", "client", + "--alter", + "--added-config", "a=b,c=d")) + createOpts.checkArgs() + + // For alter and deleted config + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "x", + "--entity-type", "client", + "--alter", + "--deleted-config", "a,b,c")) + createOpts.checkArgs() + + // For alter and both added, deleted config + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "x", + "--entity-type", "client", + "--alter", + "--added-config", "a=b,c=d", + "--deleted-config", "a")) + createOpts.checkArgs() + val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts) + assertEquals(2, addedProps.size()) + assertEquals("b", addedProps.getProperty("a")) + assertEquals("d", addedProps.getProperty("c")) + + val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts) + assertEquals(1, deletedProps.size) + assertEquals("a", deletedProps(0)) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index dcd69881445..58adef63cfd 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -22,6 +22,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness +import kafka.server.{ConfigType, OffsetManager, KafkaConfig} import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils import kafka.coordinator.ConsumerCoordinator @@ -43,20 +44,18 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin "--config", cleanupKey + "=" + cleanupVal, "--topic", topic)) TopicCommand.createTopic(zkClient, createOpts) - val props = AdminUtils.fetchTopicConfig(zkClient, topic) + val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey)) assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal)) // pre-create the topic config changes path to avoid a NoNodeException - ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath) + ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath) // modify the topic to add new partitions val numPartitionsModified = 3 - val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, - "--config", cleanupKey + "=" + cleanupVal, - "--topic", topic)) + val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic)) TopicCommand.alterTopic(zkClient, alterOpts) - val newProps = AdminUtils.fetchTopicConfig(zkClient, topic) + val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal)) } @@ -99,4 +98,4 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin } assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath)) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 8a871cfaf6a..7c453936751 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -19,6 +19,7 @@ package kafka.server import java.util.Properties import junit.framework.Assert._ +import org.easymock.{Capture, EasyMock} import org.junit.Test import kafka.integration.KafkaServerTestHarness import kafka.utils._ @@ -32,8 +33,10 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testConfigChange() { - val oldVal: java.lang.Long = 100000 - val newVal: java.lang.Long = 200000 + assertTrue("Should contain a ConfigHandler for topics", + this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topic)) + val oldVal: java.lang.Long = 100000L + val newVal: java.lang.Long = 200000L val tp = TopicAndPartition("test", 0) val logProps = new Properties() logProps.put(LogConfig.FlushMessagesProp, oldVal.toString) @@ -50,6 +53,25 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } } + // For now client config changes do not do anything. Simply verify that the call was made + @Test + def testClientConfigChange() { + assertTrue("Should contain a ConfigHandler for topics", + this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client)) + val clientId = "testClient" + val props = new Properties() + props.put("a.b", "c") + props.put("x.y", "z") + AdminUtils.changeClientIdConfig(zkClient, clientId, props) + TestUtils.retry(10000) { + val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler] + assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId)) + assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size) + assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b")) + assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y")) + } + } + @Test def testConfigChangeOnNonExistingTopic() { val topic = TestUtils.tempTopic @@ -63,4 +85,59 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } } -} \ No newline at end of file + @Test + def testProcessNotification { + val props = new Properties() + props.put("a.b", "10") + + // Create a mock ConfigHandler to record config changes it is asked to process + val entityArgument = new Capture[String]() + val propertiesArgument = new Capture[Properties]() + val handler = EasyMock.createNiceMock(classOf[ConfigHandler]) + handler.processConfigChanges( + EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])), + EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties]))) + EasyMock.expectLastCall().once() + EasyMock.replay(handler) + + val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler)) + // Notifications created using the old TopicConfigManager are ignored. + configManager.processNotification(Some("not json")) + + // Incorrect Map. No version + try { + val jsonMap = Map("v" -> 1, "x" -> 2) + configManager.processNotification(Some(Json.encode(jsonMap))) + fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) + } + catch { + case t: Throwable => + } + // Version is provided. EntityType is incorrect + try { + val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x") + configManager.processNotification(Some(Json.encode(jsonMap))) + fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) + } + catch { + case t: Throwable => + } + + // EntityName isn't provided + try { + val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic) + configManager.processNotification(Some(Json.encode(jsonMap))) + fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) + } + catch { + case t: Throwable => + } + + // Everything is provided + val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x") + configManager.processNotification(Some(Json.encode(jsonMap))) + + // Verify that processConfigChanges was only called once + EasyMock.verify(handler) + } +}