kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao

This commit is contained in:
Aditya Auradkar 2015-08-04 15:11:27 -07:00 committed by Jun Rao
parent 1a0179f21a
commit a56a79055d
17 changed files with 627 additions and 154 deletions

17
bin/kafka-configs.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@

View File

@ -21,6 +21,7 @@ import kafka.common._
import kafka.cluster.{BrokerEndPoint, Broker}
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
import kafka.api.{TopicMetadata, PartitionMetadata}
@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
val rand = new Random
val AdminClientId = "__admin_client"
val TopicConfigChangeZnodePrefix = "config_change_"
val EntityConfigChangeZnodePrefix = "config_change_"
/**
* There are 2 goals of replica assignment:
@ -103,14 +102,12 @@ object AdminUtils extends Logging {
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
* @param config Pre-existing properties that should be preserved
*/
def addPartitions(zkClient: ZkClient,
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
checkBrokerAvailable: Boolean = true,
config: Properties = new Properties) {
checkBrokerAvailable: Boolean = true) {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@ -137,7 +134,7 @@ object AdminUtils extends Logging {
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
@ -246,7 +243,6 @@ object AdminUtils extends Logging {
update: Boolean = false) {
// validate arguments
Topic.validate(topic)
LogConfig.validate(config)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = ZkUtils.getTopicPath(topic)
@ -265,8 +261,12 @@ object AdminUtils extends Logging {
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
writeTopicConfig(zkClient, topic, config)
LogConfig.validate(config)
writeEntityConfig(zkClient, ConfigType.Topic, topic, config)
}
// create the partition assignment
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
@ -291,6 +291,18 @@ object AdminUtils extends Logging {
}
}
/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) {
changeEntityConfig(zkClient, ConfigType.Client, clientId, configs)
}
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
@ -302,34 +314,42 @@ object AdminUtils extends Logging {
def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
if(!topicExists(zkClient, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
// remove the topic overrides
LogConfig.validate(configs)
changeEntityConfig(zkClient, ConfigType.Topic, topic, configs)
}
private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) {
// write the new config--may not exist if there were previously no overrides
writeTopicConfig(zkClient, topic, configs)
writeEntityConfig(zkClient, entityType, entityName, configs)
// create the change notification
zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
zkClient.createPersistentSequential(seqNode, content)
}
def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
}
/**
* Write out the topic config to zk, if there is any
*/
private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) {
val configMap: mutable.Map[String, String] = {
import JavaConversions._
config
}
val map = Map("version" -> 1, "config" -> configMap)
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map))
}
/**
* Read the topic config (if any) from zk
* Read the entity (topic or client) config (if any) from zk
*/
def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = {
val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
@ -343,19 +363,20 @@ object AdminUtils extends Logging {
configTup match {
case (k: String, v: String) =>
props.setProperty(k, v)
case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
case o => throw new IllegalArgumentException("Unexpected value in config: " + str)
case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)"
.format(str, entityType, entity))
}
}
props
}
def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topic, topic))).toMap
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils.{ZkUtils, CommandLineUtils}
import org.I0Itec.zkclient.ZkClient
import scala.collection._
import scala.collection.JavaConversions._
import org.apache.kafka.common.utils.Utils
/**
* This script can be used to change configs for topics/clients dynamically
*/
object ConfigCommand {
def main(args: Array[String]): Unit = {
val opts = new ConfigCommandOptions(args)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topic/client) configs")
opts.checkArgs()
val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
try {
if (opts.options.has(opts.alterOpt))
alterConfig(zkClient, opts)
else if (opts.options.has(opts.describeOpt))
describeConfig(zkClient, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
zkClient.close()
}
}
private def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
val configsToBeAdded = parseConfigsToBeAdded(opts)
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entityType = opts.options.valueOf(opts.entityType)
val entityName = opts.options.valueOf(opts.entityName)
// compile the final set of configs
val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))
if (entityType.equals(ConfigType.Topic)) {
AdminUtils.changeTopicConfig(zkClient, entityName, configs)
println("Updated config for topic: \"%s\".".format(entityName))
} else {
AdminUtils.changeClientIdConfig(zkClient, entityName, configs)
println("Updated config for clientId: \"%s\".".format(entityName))
}
}
private def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
val entityType = opts.options.valueOf(opts.entityType)
val entityNames: Seq[String] =
if (opts.options.has(opts.entityName))
Seq(opts.options.valueOf(opts.entityName))
else
ZkUtils.getAllEntitiesWithConfig(zkClient, entityType)
for (entityName <- entityNames) {
val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
println("Configs for %s:%s are %s"
.format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
}
}
private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.addedConfig).map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
"Invalid entity config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
props
}
private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = {
if (opts.options.has(opts.deletedConfig)) {
val configsToBeDeleted = opts.options.valuesOf(opts.deletedConfig).map(_.trim())
val propsToBeDeleted = new Properties
configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
configsToBeDeleted
}
else
Seq.empty
}
class ConfigCommandOptions(args: Array[String]) {
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
val describeOpt = parser.accepts("describe", "List configs for the given entity.")
val entityType = parser.accepts("entity-type", "Type of entity (topic/client)")
.withRequiredArg
.ofType(classOf[String])
val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)")
.withRequiredArg
.ofType(classOf[String])
val nl = System.getProperty("line.separator")
val addedConfig = parser.accepts("added-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " +
"For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
"For entity_type '" + ConfigType.Client + "' currently no configs are processed by the brokers")
.withRequiredArg
.ofType(classOf[String])
.withValuesSeparatedBy(',')
val deletedConfig = parser.accepts("deleted-config", "config keys to remove 'k1,k2'")
.withRequiredArg
.ofType(classOf[String])
.withValuesSeparatedBy(',')
val helpOpt = parser.accepts("help", "Print usage information.")
val options = parser.parse(args : _*)
val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addedConfig, deletedConfig, helpOpt)
def checkArgs() {
// should have exactly one action
val actions = Seq(alterOpt, describeOpt).count(options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addedConfig, deletedConfig))
if(options.has(alterOpt)) {
if(! options.has(entityName))
throw new IllegalArgumentException("--entity-name must be specified with --alter")
val isAddedPresent: Boolean = options.has(addedConfig)
val isDeletedPresent: Boolean = options.has(deletedConfig)
if(! isAddedPresent && ! isDeletedPresent)
throw new IllegalArgumentException("At least one of --added-config or --deleted-config must be specified with --alter")
}
val entityTypeVal = options.valueOf(entityType)
if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) {
throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client))
}
}
}
}

View File

@ -20,6 +20,7 @@ package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.common.{Topic, AdminCommandFailedException}
import kafka.utils.CommandLineUtils
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@ -27,6 +28,7 @@ import scala.collection._
import scala.collection.JavaConversions._
import kafka.log.LogConfig
import kafka.consumer.Whitelist
import kafka.server.{ConfigType, OffsetManager}
import org.apache.kafka.common.utils.Utils
import kafka.coordinator.ConsumerCoordinator
@ -106,16 +108,6 @@ object TopicCommand extends Logging {
opts.options.valueOf(opts.zkConnectOpt)))
}
topics.foreach { topic =>
val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
// compile the final set of configs
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.partitionsOpt)) {
if (topic == ConsumerCoordinator.OffsetsTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
@ -124,7 +116,7 @@ object TopicCommand extends Logging {
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("Adding partitions succeeded!")
}
}
@ -180,7 +172,7 @@ object TopicCommand extends Logging {
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (describeConfigs) {
val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
if (!reportOverriddenConfigs || configs.size() != 0) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
@ -219,18 +211,6 @@ object TopicCommand extends Logging {
props
}
def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
if (opts.options.has(opts.deleteConfigOpt)) {
val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
val propsToBeDeleted = new Properties
configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
LogConfig.validateNames(propsToBeDeleted)
configsToBeDeleted
}
else
Seq.empty
}
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
@ -256,7 +236,7 @@ object TopicCommand extends Logging {
val listOpt = parser.accepts("list", "List all available topics.")
val createOpt = parser.accepts("create", "Create a new topic.")
val deleteOpt = parser.accepts("delete", "Delete a topic")
val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
@ -265,16 +245,12 @@ object TopicCommand extends Logging {
.describedAs("topic")
.ofType(classOf[String])
val nl = System.getProperty("line.separator")
val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
val configOpt = parser.accepts("config", "A configuration override for the topic being created." +
"The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
"See the Kafka documentation for full details on the topic configs.")
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).")
.withRequiredArg
.describedAs("name")
.ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
.withRequiredArg
@ -308,10 +284,11 @@ object TopicCommand extends Logging {
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
// Topic configs cannot be changed with alterTopic
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt))
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,

View File

@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager}
import kafka.server._
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
@ -86,7 +86,8 @@ class Partition(val topic: String,
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic))
val config = LogConfig.fromProps(logManager.defaultConfig.originals,
AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read

View File

@ -1037,8 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr
}
@ -1322,7 +1322,8 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
processUpdateNotifications(topicAndPartitions)
// delete processed children
childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x))
childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
ZkUtils.getEntityConfigPath(ConfigType.Topic, x)))
}
}

View File

@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
import kafka.utils.Logging
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.server.KafkaConfig
import kafka.server.{ConfigType, KafkaConfig}
trait PartitionLeaderSelector {
@ -61,8 +61,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))

View File

@ -16,6 +16,9 @@
*/
package kafka.controller
import kafka.server.ConfigType
import collection.mutable
import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
import kafka.utils.CoreUtils._
@ -284,7 +287,7 @@ class TopicDeletionManager(controller: KafkaController,
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.log.{Log, LogConfig, LogManager}
import kafka.utils.Pool
import scala.collection.mutable
/**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
*/
trait ConfigHandler {
def processConfigChanges(entityName : String, value : Properties)
}
/**
* The TopicConfigHandler will process topic config changes in ZK.
* The callback provides the topic name and the full properties set read from ZK
*/
class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{
def processConfigChanges(topic : String, topicConfig : Properties) {
val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic }
.mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) }
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
props.putAll(logManager.defaultConfig.originals)
props.putAll(topicConfig)
val logConfig = LogConfig(props)
for (log <- logsByTopic(topic))
log.config = logConfig
}
}
}
/**
* The ClientIdConfigHandler will process clientId config changes in ZK.
* The callback provides the clientId and the full properties set read from ZK.
* This implementation does nothing currently. In the future, it will change quotas per client
*/
class ClientIdConfigHandler extends ConfigHandler {
val configPool = new Pool[String, Properties]()
def processConfigChanges(clientId : String, clientConfig : Properties): Unit = {
configPool.put(clientId, clientConfig)
}
}

View File

@ -17,36 +17,48 @@
package kafka.server
import java.util.Properties
import kafka.utils.Json
import kafka.utils.Logging
import kafka.utils.SystemTime
import kafka.utils.Time
import kafka.utils.ZkUtils
import scala.collection._
import kafka.log._
import kafka.utils._
import kafka.admin.AdminUtils
import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
/**
* This class initiates and carries out topic config changes.
* Represents all the entities that can be configured via ZK
*/
object ConfigType {
val Topic = "topic"
val Client = "client"
}
/**
* This class initiates and carries out config changes for all entities defined in ConfigType.
*
* It works as follows.
*
* Config is stored under the path
* /config/topics/<topic_name>
* This znode stores the topic-overrides for this topic (but no defaults) in properties format.
* Config is stored under the path: /config/entityType/entityName
* E.g. /config/topics/<topic_name> and /config/clients/<clientId>
* This znode stores the overrides for this entity (but no defaults) in properties format.
*
* To avoid watching all topics for changes instead we have a notification path
* /config/changes
* The TopicConfigManager has a child watch on this path.
* The DynamicConfigManager has a child watch on this path.
*
* To update a topic config we first update the topic config properties. Then we create a new sequential
* znode under the change path which contains the name of the topic that was updated, say
* To update a config we first update the config properties. Then we create a new sequential
* znode under the change path which contains the name of the entityType and entityName that was updated, say
* /config/changes/config_change_13321
* This is just a notification--the actual config change is stored only once under the /config/topics/<topic_name> path.
* The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"}
* This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path.
*
* This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
* It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
* it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
* For any new changes it reads the new configuration, combines it with the defaults, and updates the log config
* for all logs for that topic (if any) that it has.
* For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config.
*
* Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
* down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
@ -58,8 +70,8 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
class TopicConfigManager(private val zkClient: ZkClient,
private val logManager: LogManager,
class DynamicConfigManager(private val zkClient: ZkClient,
private val configHandler : Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
@ -68,8 +80,8 @@ class TopicConfigManager(private val zkClient: ZkClient,
* Begin watching for config changes
*/
def startup() {
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath)
zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
processAllConfigChanges()
}
@ -77,7 +89,7 @@ class TopicConfigManager(private val zkClient: ZkClient,
* Process all config changes
*/
private def processAllConfigChanges() {
val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
import JavaConversions._
processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
}
@ -89,39 +101,59 @@ class TopicConfigManager(private val zkClient: ZkClient,
if (notifications.size > 0) {
info("Processing config change notification(s)...")
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
if(jsonOpt.isDefined) {
val json = jsonOpt.get
val topic = json.substring(1, json.length - 1) // hacky way to dequote
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
props.putAll(logManager.defaultConfig.originals)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
val logConfig = LogConfig(props)
for (log <- logsByTopic(topic))
log.config = logConfig
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
purgeObsoleteNotifications(now, notifications)
}
processNotification(jsonOpt)
}
lastExecutedChange = changeId
}
purgeObsoleteNotifications(now, notifications)
}
}
def processNotification(jsonOpt: Option[String]) = {
if(jsonOpt.isDefined) {
val json = jsonOpt.get
Json.parseFull(json) match {
case None => // There are no config overrides.
// Ignore non-json notifications because they can be from the deprecated TopicConfigManager
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect
{ case (k: String, v: Any) => k -> v }
require(map("version") == 1)
val entityType = map.get("entity_type") match {
case Some(ConfigType.Topic) => ConfigType.Topic
case Some(ConfigType.Client) => ConfigType.Client
case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." +
" Received: " + json)
}
val entity = map.get("entity_name") match {
case Some(value: String) => value
case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
}
configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity))
case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1," +
" \"entity_type\":\"topic/client\"," +
" \"entity_name\" : \"topic_name/client_id\"}." +
" Received: " + json)
}
}
}
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
for(notification <- notifications.sorted) {
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification)
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification)
if(jsonOpt.isDefined) {
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
if (now - stat.getCtime > changeExpirationMs) {
debug("Purging config change notification " + notification)
ZkUtils.deletePath(zkClient, changeZnode)
@ -133,7 +165,7 @@ class TopicConfigManager(private val zkClient: ZkClient,
}
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong
/**
* A listener that applies config changes to logs
@ -148,5 +180,4 @@ class TopicConfigManager(private val zkClient: ZkClient,
}
}
}
}

View File

@ -31,8 +31,9 @@ import java.io.File
import kafka.utils._
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.network.NetworkReceive
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import scala.collection.{JavaConversions, mutable}
import scala.collection.mutable
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.{EndPoint, Broker}
@ -77,7 +78,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var replicaManager: ReplicaManager = null
var topicConfigManager: TopicConfigManager = null
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
val metrics: Metrics = new Metrics()
var consumerCoordinator: ConsumerCoordinator = null
@ -171,9 +174,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Mx4jLoader.maybeLoad()
/* start topic config manager */
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
ConfigType.Client -> new ClientIdConfigHandler)
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>

View File

@ -90,8 +90,8 @@ class ReplicaFetcherThread(name:String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
" Current leader %d's latest offset %d is less than replica %d's latest offset %d"

View File

@ -19,6 +19,7 @@ package kafka.utils
import kafka.cluster._
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.server.ConfigType
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
ZkMarshallingError, ZkBadVersionException}
@ -39,8 +40,6 @@ object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val TopicConfigPath = "/config/topics"
val TopicConfigChangesPath = "/config/changes"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
@ -48,6 +47,8 @@ object ZkUtils extends Logging {
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
@ -57,8 +58,11 @@ object ZkUtils extends Logging {
getTopicPath(topic) + "/partitions"
}
def getTopicConfigPath(topic: String): String =
TopicConfigPath + "/" + topic
def getEntityConfigRootPath(entityType: String): String =
EntityConfigPath + "/" + entityType
def getEntityConfigPath(entityType: String, entity: String): String =
getEntityConfigRootPath(entityType) + "/" + entity
def getDeleteTopicPath(topic: String): String =
DeleteTopicsPath + "/" + topic
@ -93,8 +97,14 @@ object ZkUtils extends Logging {
}
def setupCommonPaths(zkClient: ZkClient) {
for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath,
DeleteTopicsPath, BrokerSequenceIdPath))
for(path <- Seq(ConsumersPath,
BrokerIdsPath,
BrokerTopicsPath,
EntityConfigChangesPath,
ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
ZkUtils.getEntityConfigRootPath(ConfigType.Client),
DeleteTopicsPath,
BrokerSequenceIdPath))
makeSurePersistentPathExists(zkClient, path)
}
@ -753,6 +763,17 @@ object ZkUtils extends Logging {
topics
}
/**
* Returns all the entities whose configs have been overridden.
*/
def getAllEntitiesWithConfig(zkClient: ZkClient, entityType: String): Seq[String] = {
val entities = ZkUtils.getChildrenParentMayNotExist(zkClient, getEntityConfigRootPath(entityType))
if(entities == null)
Seq.empty[String]
else
entities
}
def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
if(topics == null) Set.empty[TopicAndPartition]

View File

@ -25,7 +25,7 @@ import kafka.log._
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{Logging, ZkUtils, TestUtils}
import kafka.common.{InvalidTopicException, TopicExistsException, TopicAndPartition}
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
import java.io.File
import TestUtils._
@ -407,12 +407,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
checkConfig(maxMessageSize, retentionMs)
// now double the config values for the topic and check that it is applied
val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs)
AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
checkConfig(2*maxMessageSize, 2 * retentionMs)
// Verify that the same config can be read from ZK
val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic)
assertEquals(newConfig, configInZk)
} finally {
server.shutdown()
server.config.logDirs.foreach(CoreUtils.rm(_))
}
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import junit.framework.Assert._
import kafka.admin.ConfigCommand.ConfigCommandOptions
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@Test
def testArgumentParse() {
// Should parse correctly
var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "x",
"--entity-type", "client",
"--describe"))
createOpts.checkArgs()
// For --alter and added config
createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "x",
"--entity-type", "client",
"--alter",
"--added-config", "a=b,c=d"))
createOpts.checkArgs()
// For alter and deleted config
createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "x",
"--entity-type", "client",
"--alter",
"--deleted-config", "a,b,c"))
createOpts.checkArgs()
// For alter and both added, deleted config
createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "x",
"--entity-type", "client",
"--alter",
"--added-config", "a=b,c=d",
"--deleted-config", "a"))
createOpts.checkArgs()
val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts)
assertEquals(2, addedProps.size())
assertEquals("b", addedProps.getProperty("a"))
assertEquals("d", addedProps.getProperty("c"))
val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts)
assertEquals(1, deletedProps.size)
assertEquals("a", deletedProps(0))
}
}

View File

@ -22,6 +22,7 @@ import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
import kafka.coordinator.ConsumerCoordinator
@ -43,20 +44,18 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
TopicCommand.createTopic(zkClient, createOpts)
val props = AdminUtils.fetchTopicConfig(zkClient, topic)
val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
// pre-create the topic config changes path to avoid a NoNodeException
ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath)
ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath)
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString,
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
TopicCommand.alterTopic(zkClient, alterOpts)
val newProps = AdminUtils.fetchTopicConfig(zkClient, topic)
val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}

View File

@ -19,6 +19,7 @@ package kafka.server
import java.util.Properties
import junit.framework.Assert._
import org.easymock.{Capture, EasyMock}
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
@ -32,8 +33,10 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testConfigChange() {
val oldVal: java.lang.Long = 100000
val newVal: java.lang.Long = 200000
assertTrue("Should contain a ConfigHandler for topics",
this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topic))
val oldVal: java.lang.Long = 100000L
val newVal: java.lang.Long = 200000L
val tp = TopicAndPartition("test", 0)
val logProps = new Properties()
logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
@ -50,6 +53,25 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
}
}
// For now client config changes do not do anything. Simply verify that the call was made
@Test
def testClientConfigChange() {
assertTrue("Should contain a ConfigHandler for topics",
this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
val clientId = "testClient"
val props = new Properties()
props.put("a.b", "c")
props.put("x.y", "z")
AdminUtils.changeClientIdConfig(zkClient, clientId, props)
TestUtils.retry(10000) {
val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId))
assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size)
assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b"))
assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y"))
}
}
@Test
def testConfigChangeOnNonExistingTopic() {
val topic = TestUtils.tempTopic
@ -63,4 +85,59 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
}
}
@Test
def testProcessNotification {
val props = new Properties()
props.put("a.b", "10")
// Create a mock ConfigHandler to record config changes it is asked to process
val entityArgument = new Capture[String]()
val propertiesArgument = new Capture[Properties]()
val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
handler.processConfigChanges(
EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
EasyMock.expectLastCall().once()
EasyMock.replay(handler)
val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
// Notifications created using the old TopicConfigManager are ignored.
configManager.processNotification(Some("not json"))
// Incorrect Map. No version
try {
val jsonMap = Map("v" -> 1, "x" -> 2)
configManager.processNotification(Some(Json.encode(jsonMap)))
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
case t: Throwable =>
}
// Version is provided. EntityType is incorrect
try {
val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
configManager.processNotification(Some(Json.encode(jsonMap)))
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
case t: Throwable =>
}
// EntityName isn't provided
try {
val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
configManager.processNotification(Some(Json.encode(jsonMap)))
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
case t: Throwable =>
}
// Everything is provided
val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
configManager.processNotification(Some(Json.encode(jsonMap)))
// Verify that processConfigChanges was only called once
EasyMock.verify(handler)
}
}