mirror of https://github.com/apache/kafka.git
KAFKA-2454; Deadlock between log segment deletion and server shutdown.
Author: Jiangjie Qin <becket.qin@gmail.com> Reviewers: Joel Koshy <jjkoshy.w@gmail.com> Closes #153 from becketqin/KAFKA-2454
This commit is contained in:
parent
86eb74d923
commit
3005653813
|
@ -70,7 +70,7 @@ class KafkaScheduler(val threads: Int,
|
|||
daemon: Boolean = true) extends Scheduler with Logging {
|
||||
private var executor: ScheduledThreadPoolExecutor = null
|
||||
private val schedulerThreadId = new AtomicInteger(0)
|
||||
|
||||
|
||||
override def startup() {
|
||||
debug("Initializing task scheduler.")
|
||||
this synchronized {
|
||||
|
@ -88,12 +88,14 @@ class KafkaScheduler(val threads: Int,
|
|||
|
||||
override def shutdown() {
|
||||
debug("Shutting down task scheduler.")
|
||||
this synchronized {
|
||||
if(isStarted) {
|
||||
executor.shutdown()
|
||||
executor.awaitTermination(1, TimeUnit.DAYS)
|
||||
// We use the local variable to avoid NullPointerException if another thread shuts down scheduler at same time.
|
||||
val cachedExecutor = this.executor
|
||||
if (cachedExecutor != null) {
|
||||
this synchronized {
|
||||
cachedExecutor.shutdown()
|
||||
this.executor = null
|
||||
}
|
||||
cachedExecutor.awaitTermination(1, TimeUnit.DAYS)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,10 +103,10 @@ class KafkaScheduler(val threads: Int,
|
|||
debug("Scheduling task %s with initial delay %d ms and period %d ms."
|
||||
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
|
||||
this synchronized {
|
||||
ensureStarted
|
||||
ensureRunning
|
||||
val runnable = CoreUtils.runnable {
|
||||
try {
|
||||
trace("Begining execution of scheduled task '%s'.".format(name))
|
||||
trace("Beginning execution of scheduled task '%s'.".format(name))
|
||||
fun()
|
||||
} catch {
|
||||
case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
|
||||
|
@ -125,8 +127,8 @@ class KafkaScheduler(val threads: Int,
|
|||
}
|
||||
}
|
||||
|
||||
private def ensureStarted = {
|
||||
private def ensureRunning = {
|
||||
if(!isStarted)
|
||||
throw new IllegalStateException("Kafka scheduler has not been started")
|
||||
throw new IllegalStateException("Kafka scheduler is not running.")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue