mirror of https://github.com/apache/kafka.git
MINOR: fix shutdownHook in ConsoleConsumer
Author: Confluent <confluent@Confluents-MacBook-Pro.local> Reviewers: Jun Rao <junrao@gmail.com> Closes #548 from guozhangwang/HFConsoleConsumer
This commit is contained in:
parent
dacd21ec4e
commit
36b1c1dae2
|
|
@ -18,6 +18,7 @@
|
|||
package kafka.tools
|
||||
|
||||
import java.io.PrintStream
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.{Properties, Random}
|
||||
import joptsimple._
|
||||
import kafka.common.StreamEndException
|
||||
|
|
@ -26,6 +27,7 @@ import kafka.message._
|
|||
import kafka.metrics.KafkaMetricsReporter
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||
import org.apache.kafka.common.errors.WakeupException
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
|
@ -37,6 +39,8 @@ object ConsoleConsumer extends Logging {
|
|||
|
||||
var messageCount = 0
|
||||
|
||||
private val shutdownLatch = new CountDownLatch(1)
|
||||
|
||||
def main(args: Array[String]) {
|
||||
val conf = new ConsumerConfig(args)
|
||||
try {
|
||||
|
|
@ -70,6 +74,8 @@ object ConsoleConsumer extends Logging {
|
|||
// if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack
|
||||
if (!conf.groupIdPassed)
|
||||
ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id"))
|
||||
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -91,6 +97,8 @@ object ConsoleConsumer extends Logging {
|
|||
Runtime.getRuntime.addShutdownHook(new Thread() {
|
||||
override def run() {
|
||||
consumer.stop()
|
||||
|
||||
shutdownLatch.await()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -105,6 +113,10 @@ object ConsoleConsumer extends Logging {
|
|||
trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
|
||||
// Consumer is already closed
|
||||
return
|
||||
case nse: WakeupException =>
|
||||
trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
|
||||
// Consumer will be closed
|
||||
return
|
||||
case e: Throwable =>
|
||||
error("Error processing message, terminating consumer process: ", e)
|
||||
// Consumer will be closed
|
||||
|
|
|
|||
Loading…
Reference in New Issue