KAFKA-1139 Topic data change handling callback should not call syncedRebalance directly; reviewed by Guozhang Wang and Jun Rao

This commit is contained in:
Neha Narkhede 2013-12-19 12:54:10 -08:00
parent dd58d753ce
commit b5d16871c0
2 changed files with 72 additions and 59 deletions

View File

@ -25,6 +25,7 @@ import scala.collection._
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.consumer.Whitelist
object TopicCommand { object TopicCommand {
@ -43,67 +44,79 @@ object TopicCommand {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
if(opts.options.has(opts.createOpt))
createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
listTopics(zkClient, opts)
else if(opts.options.has(opts.describeOpt))
describeTopic(zkClient, opts)
zkClient.close() try {
if(opts.options.has(opts.createOpt))
createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
listTopics(zkClient, opts)
else if(opts.options.has(opts.describeOpt))
describeTopic(zkClient, opts)
} catch {
case e => println("Error while executing topic command", e)
} finally {
zkClient.close()
}
}
private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
val topicsSpec = opts.options.valueOf(opts.topicOpt)
val topicsFilter = new Whitelist(topicsSpec)
val allTopics = ZkUtils.getAllTopics(zkClient)
allTopics.filter(topicsFilter.isTopicAllowed).sorted
} }
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt) val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts) val configs = parseTopicConfigsToBeAdded(opts)
for (topic <- topics) { if (opts.options.has(opts.replicaAssignmentOpt)) {
if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) } else {
} else { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
}
println("Created topic \"%s\".".format(topic))
} }
println("Created topic \"%s\".".format(topic))
} }
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topic = opts.options.valueOf(opts.topicOpt) val topics = getTopics(zkClient, opts)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { topics.foreach { topic =>
val configsToBeAdded = parseTopicConfigsToBeAdded(opts) if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
// compile the final set of configs val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
val configs = AdminUtils.fetchTopicConfig(zkClient, topic) // compile the final set of configs
configs.putAll(configsToBeAdded) val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
configsToBeDeleted.foreach(config => configs.remove(config)) configs.putAll(configsToBeAdded)
AdminUtils.changeTopicConfig(zkClient, topic, configs) configsToBeDeleted.foreach(config => configs.remove(config))
println("Updated config for topic \"%s\".".format(topic)) AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.partitionsOpt)) {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
}
if(opts.options.has(opts.replicationFactorOpt))
Utils.croak("Changing the replication factor is not supported.")
} }
if(opts.options.has(opts.partitionsOpt)) {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
}
if(opts.options.has(opts.replicationFactorOpt))
Utils.croak("Changing the replication factor is not supported.")
} }
def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
for(topic <- opts.options.valuesOf(opts.topicOpt)) { val topics = getTopics(zkClient, opts)
topics.foreach { topic =>
AdminUtils.deleteTopic(zkClient, topic) AdminUtils.deleteTopic(zkClient, topic)
println("Topic \"%s\" deleted.".format(topic)) println("Topic \"%s\" deleted.".format(topic))
} }
@ -128,9 +141,7 @@ object TopicCommand {
} }
def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted var topics = getTopics(zkClient, opts)
if (topics.size <= 0)
topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
@ -212,7 +223,8 @@ object TopicCommand {
val deleteOpt = parser.accepts("delete", "Delete the topic.") val deleteOpt = parser.accepts("delete", "Delete the topic.")
val describeOpt = parser.accepts("describe", "List details for the given topics.") val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.") val helpOpt = parser.accepts("help", "Print usage information.")
val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.") val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " +
"expression except for --create option")
.withRequiredArg .withRequiredArg
.describedAs("topic") .describedAs("topic")
.ofType(classOf[String]) .ofType(classOf[String])

View File

@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val messageStreamCreated = new AtomicBoolean(false) private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null private var sessionExpirationListener: ZKSessionExpireListener = null
private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null private var loadBalancerListener: ZKRebalancerListener = null
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
@ -302,7 +302,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
registerConsumerInZK(dirs, consumerIdString, topicCount) registerConsumerInZK(dirs, consumerIdString, topicCount)
// explicitly trigger load balancing for this consumer // explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance() loadBalancerListener.syncedRebalance()
// There is no need to resubscribe to child and state changes. // There is no need to resubscribe to child and state changes.
// The child change watchers will be set inside rebalance when we read the children list. // The child change watchers will be set inside rebalance when we read the children list.
} }
@ -315,9 +314,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def handleDataChange(dataPath : String, data: Object) { def handleDataChange(dataPath : String, data: Object) {
try { try {
info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance")
// explicitly trigger load balancing for this consumer // queue up the rebalance event
loadBalancerListener.syncedRebalance() loadBalancerListener.rebalanceEventTriggered()
// There is no need to re-subscribe the watcher since it will be automatically // There is no need to re-subscribe the watcher since it will be automatically
// re-registered upon firing of this event by zkClient // re-registered upon firing of this event by zkClient
} catch { } catch {
@ -335,7 +333,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
class ZKRebalancerListener(val group: String, val consumerIdString: String, class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener { extends IZkChildListener {
private val correlationId = new AtomicInteger(0)
private var isWatcherTriggered = false private var isWatcherTriggered = false
private val lock = new ReentrantLock private val lock = new ReentrantLock
private val cond = lock.newCondition() private val cond = lock.newCondition()
@ -367,6 +364,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
@throws(classOf[Exception]) @throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
rebalanceEventTriggered()
}
def rebalanceEventTriggered() {
inLock(lock) { inLock(lock) {
isWatcherTriggered = true isWatcherTriggered = true
cond.signalAll() cond.signalAll()
@ -655,8 +656,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
dirs, consumerIdString, topicCount, loadBalancerListener) dirs, consumerIdString, topicCount, loadBalancerListener)
// create listener for topic partition change event if not exist yet // create listener for topic partition change event if not exist yet
if (topicPartitionChangeListenner == null) if (topicPartitionChangeListener == null)
topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
@ -714,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicStreamsMap.foreach { topicAndStreams => topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes // register on broker partition path changes
val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
} }
// explicitly trigger load balancing for this consumer // explicitly trigger load balancing for this consumer