mirror of https://github.com/apache/kafka.git
KAFKA-124 Console consumer does not stop consuming if the program reading from standard out dies. Check for errors on the output stream and exit if no one is listening.
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1163911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
92b1dd9252
commit
c247dfa884
|
@ -136,20 +136,27 @@ object ConsoleConsumer {
|
|||
} catch {
|
||||
case e =>
|
||||
if (skipMessageOnError)
|
||||
logger.error("error processing message, skipping and resume consumption: " + e)
|
||||
logger.error("Error processing message, skipping this message: ", e)
|
||||
else
|
||||
throw e
|
||||
}
|
||||
if(System.out.checkError()) {
|
||||
// This means no one is listening to our output stream any more, time to shutdown
|
||||
System.err.println("Unable to write to standard out, closing consumer.")
|
||||
formatter.close()
|
||||
connector.shutdown()
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e => logger.error("error processing message, stop consuming: " + e)
|
||||
case e => logger.error("Error processing message, stopping consumer: ", e)
|
||||
}
|
||||
|
||||
System.out.flush()
|
||||
formatter.close()
|
||||
connector.shutdown()
|
||||
}
|
||||
|
||||
|
||||
def tryParse(parser: OptionParser, args: Array[String]) = {
|
||||
try {
|
||||
parser.parse(args : _*)
|
||||
|
|
Loading…
Reference in New Issue