KAFKA-695 Broker shuts down due to attempt to read a closed index file;reviewed by Neha Narkhede, Jay Kreps

This commit is contained in:
Jun Rao 2013-01-28 16:50:28 -08:00 committed by Neha Narkhede
parent 40a80fa7b7
commit 1fb3e8c037
1 changed files with 12 additions and 17 deletions

View File

@ -88,18 +88,11 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
def purgeSatisfied() {
expiredRequestReaper.forcePurge()
}
/**
* Add a new delayed request watching the contained keys
*/
def watch(delayedRequest: T) {
if (requestCounter.getAndIncrement() >= purgeInterval) {
requestCounter.set(0)
purgeSatisfied()
}
requestCounter.getAndIncrement()
for(key <- delayedRequest.keys) {
var lst = watchersFor(key)
@ -218,15 +211,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
while(running.get) {
try {
val curr = pollExpired()
curr synchronized {
expire(curr)
if (curr != null) {
curr synchronized {
expire(curr)
}
}
} catch {
case ie: InterruptedException =>
if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
requestCounter.set(0)
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
}
} catch {
case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
@ -240,10 +237,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
unsatisfied.incrementAndGet()
}
def forcePurge() {
expirationThread.interrupt()
}
/** Shutdown the expiry thread*/
def shutdown() {
debug("Shutting down.")
@ -261,7 +254,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
*/
private def pollExpired(): T = {
while(true) {
val curr = delayed.take()
val curr = delayed.poll(200L, TimeUnit.MILLISECONDS)
if (curr == null)
return null.asInstanceOf[T]
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
unsatisfied.getAndDecrement()