kafka-937; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy

This commit is contained in:
Jun Rao 2013-06-12 20:50:38 -07:00
parent 640026467c
commit 5bd33c1517
4 changed files with 35 additions and 53 deletions

View File

@ -110,7 +110,7 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10*1000)
.defaultsTo(ConsumerConfig.AutoCommitInterval)
val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
.withRequiredArg
.describedAs("num_messages")

View File

@ -51,6 +51,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
// thread responsible for adding the fetcher to the right broker when leader is available
override def doWork() {
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
lock.lock()
try {
if (noLeaderPartitionSet.isEmpty) {
@ -58,7 +59,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
cond.await()
}
try {
trace("Partitions without leader %s".format(noLeaderPartitionSet))
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
@ -67,7 +67,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
config.socketTimeoutMs,
correlationId.getAndIncrement).topicsMetadata
if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
topicsMetadata.foreach { tmd =>
val topic = tmd.topic
tmd.partitionsMetadata.foreach { pmd =>
@ -75,47 +74,28 @@ class ConsumerFetcherManager(private val consumerIdString: String,
if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
val leaderBroker = pmd.leader.get
leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
noLeaderPartitionSet -= topicAndPartition
}
}
}
} catch {
case t => {
if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else
warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
}
} finally {
lock.unlock()
}
leaderForPartitionsMap.foreach {
case(topicAndPartition, leaderBroker) =>
val pti = partitionMap(topicAndPartition)
try {
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
noLeaderPartitionSet -= topicAndPartition
} catch {
case t => {
/*
* If we are shutting down (e.g., due to a rebalance) propagate this exception upward to avoid
* processing subsequent partitions without leader so the leader-finder-thread can exit.
* It is unfortunate that we depend on the following behavior and we should redesign this: as part of
* processing partitions, we catch the InterruptedException (thrown from addPartition's call to
* lockInterruptibly) when adding partitions, thereby clearing the interrupted flag. If we process
* more partitions, then the lockInterruptibly in addPartition will not throw an InterruptedException
* and we can run into the deadlock described in KAFKA-914.
*/
if (!isRunning.get())
throw t
else
warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
}
}
}
shutdownIdleFetcherThreads()
} catch {
case t => {
if (!isRunning.get())
throw t /* See above for why we need to propagate this exception. */
else
warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
}
}
} finally {
lock.unlock()
}
Thread.sleep(config.refreshLeaderBackoffMs)
}
}

View File

@ -37,6 +37,7 @@ class SimpleConsumer(val host: String,
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
val brokerInfo = "host_%s-port_%s".format(host, port)
private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
private var isClosed = false
private def connect(): BlockingChannel = {
close
@ -59,6 +60,7 @@ class SimpleConsumer(val host: String,
def close() {
lock synchronized {
disconnect()
isClosed = true
}
}
@ -123,7 +125,7 @@ class SimpleConsumer(val host: String,
def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
private def getOrMakeConnection() {
if(!blockingChannel.isConnected) {
if(!isClosed && !blockingChannel.isConnected) {
connect()
}
}

View File

@ -96,8 +96,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t =>
warn("Error in fetch %s".format(fetchRequest), t)
if (isRunning.get) {
warn("Error in fetch %s".format(fetchRequest), t)
partitionMapLock synchronized {
partitionsWithError ++= partitionMap.keys
}