Add a queue of zookeeper notifications in the zookeeper consumer to reduce the number of rebalancing attempts; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-265

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1243721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-02-13 22:31:02 +00:00
parent 6a3c2f2032
commit 732c27ff3e
1 changed files with 33 additions and 3 deletions

View File

@ -19,10 +19,10 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
import locks.ReentrantLock
import scala.collection._
import kafka.cluster._
import kafka.utils._
import mutable.ListBuffer
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
@ -373,11 +373,41 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
extends IZkChildListener {
private val dirs = new ZKGroupDirs(group)
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
info("starting watcher executor thread for consumer " + consumerIdString)
while (!isShuttingDown.get) {
try {
lock.lock()
try {
if (!isWatcherTriggered)
cond.wait()
} finally {
isWatcherTriggered = false
lock.unlock()
}
syncedRebalance
} catch {
case t => error("error during syncedRebalance", t)
}
}
info("stopping watcher executor thread for consumer " + consumerIdString)
}
}
watcherExecutorThread.start()
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
syncedRebalance
lock.lock()
try {
isWatcherTriggered = true
cond.signalAll()
} finally {
lock.unlock()
}
}
private def releasePartitionOwnership()= {