diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 5bab08d473e..641218ed977 100755 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -70,7 +70,7 @@ class KafkaScheduler(val threads: Int, daemon: Boolean = true) extends Scheduler with Logging { private var executor: ScheduledThreadPoolExecutor = null private val schedulerThreadId = new AtomicInteger(0) - + override def startup() { debug("Initializing task scheduler.") this synchronized { @@ -88,12 +88,14 @@ class KafkaScheduler(val threads: Int, override def shutdown() { debug("Shutting down task scheduler.") - this synchronized { - if(isStarted) { - executor.shutdown() - executor.awaitTermination(1, TimeUnit.DAYS) + // We use the local variable to avoid NullPointerException if another thread shuts down scheduler at same time. + val cachedExecutor = this.executor + if (cachedExecutor != null) { + this synchronized { + cachedExecutor.shutdown() this.executor = null } + cachedExecutor.awaitTermination(1, TimeUnit.DAYS) } } @@ -101,10 +103,10 @@ class KafkaScheduler(val threads: Int, debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) this synchronized { - ensureStarted + ensureRunning val runnable = CoreUtils.runnable { try { - trace("Begining execution of scheduled task '%s'.".format(name)) + trace("Beginning execution of scheduled task '%s'.".format(name)) fun() } catch { case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) @@ -125,8 +127,8 @@ class KafkaScheduler(val threads: Int, } } - private def ensureStarted = { + private def ensureRunning = { if(!isStarted) - throw new IllegalStateException("Kafka scheduler has not been started") + throw new IllegalStateException("Kafka scheduler is not running.") } }