From 36b1c1dae292b9a43f56c385de13b89dfd03cad8 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 17 Nov 2015 18:00:09 -0800 Subject: [PATCH] MINOR: fix shutdownHook in ConsoleConsumer Author: Confluent Reviewers: Jun Rao Closes #548 from guozhangwang/HFConsoleConsumer --- .../src/main/scala/kafka/tools/ConsoleConsumer.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 24f2ac12060..2b9eb8476ec 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -18,6 +18,7 @@ package kafka.tools import java.io.PrintStream +import java.util.concurrent.CountDownLatch import java.util.{Properties, Random} import joptsimple._ import kafka.common.StreamEndException @@ -26,6 +27,7 @@ import kafka.message._ import kafka.metrics.KafkaMetricsReporter import kafka.utils._ import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ @@ -37,6 +39,8 @@ object ConsoleConsumer extends Logging { var messageCount = 0 + private val shutdownLatch = new CountDownLatch(1) + def main(args: Array[String]) { val conf = new ConsumerConfig(args) try { @@ -70,6 +74,8 @@ object ConsoleConsumer extends Logging { // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack if (!conf.groupIdPassed) ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) + + shutdownLatch.countDown() } } @@ -91,6 +97,8 @@ object ConsoleConsumer extends Logging { Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { consumer.stop() + + shutdownLatch.await() } }) } @@ -105,6 +113,10 @@ object ConsoleConsumer extends Logging { trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.") // Consumer is already closed return + case nse: WakeupException => + trace("Caught WakeupException because consumer is shutdown, ignore and terminate.") + // Consumer will be closed + return case e: Throwable => error("Error processing message, terminating consumer process: ", e) // Consumer will be closed