diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index cce2e2d4b04..aa03bd7a33c 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -19,10 +19,10 @@ package kafka.consumer import java.util.concurrent._ import java.util.concurrent.atomic._ +import locks.ReentrantLock import scala.collection._ import kafka.cluster._ import kafka.utils._ -import mutable.ListBuffer import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} @@ -373,11 +373,41 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKRebalancerListener[T](val group: String, val consumerIdString: String, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) extends IZkChildListener { - private val dirs = new ZKGroupDirs(group) + private var isWatcherTriggered = false + private val lock = new ReentrantLock + private val cond = lock.newCondition() + private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { + override def run() { + info("starting watcher executor thread for consumer " + consumerIdString) + while (!isShuttingDown.get) { + try { + lock.lock() + try { + if (!isWatcherTriggered) + cond.wait() + } finally { + isWatcherTriggered = false + lock.unlock() + } + syncedRebalance + } catch { + case t => error("error during syncedRebalance", t) + } + } + info("stopping watcher executor thread for consumer " + consumerIdString) + } + } + watcherExecutorThread.start() @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - syncedRebalance + lock.lock() + try { + isWatcherTriggered = true + cond.signalAll() + } finally { + lock.unlock() + } } private def releasePartitionOwnership()= {