From c247dfa8844f6d112755f21918dc1515db37a05e Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Thu, 1 Sep 2011 05:47:21 +0000 Subject: [PATCH] KAFKA-124 Console consumer does not stop consuming if the program reading from standard out dies. Check for errors on the output stream and exit if no one is listening. git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1163911 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/consumer/ConsoleConsumer.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 341f6348171..21aacccd81d 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -136,20 +136,27 @@ object ConsoleConsumer { } catch { case e => if (skipMessageOnError) - logger.error("error processing message, skipping and resume consumption: " + e) + logger.error("Error processing message, skipping this message: ", e) else throw e } + if(System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + formatter.close() + connector.shutdown() + System.exit(1) + } } } catch { - case e => logger.error("error processing message, stop consuming: " + e) + case e => logger.error("Error processing message, stopping consumer: ", e) } System.out.flush() formatter.close() connector.shutdown() } - + def tryParse(parser: OptionParser, args: Array[String]) = { try { parser.parse(args : _*)