mirror of https://github.com/apache/kafka.git
KAFKA-2784: swallow exceptions when mirror maker exits.
Author: Jiangjie Qin <becket.qin@gmail.com> Reviewers: Gwen Shapira Closes #478 from becketqin/KAFKA-2784
This commit is contained in:
parent
fe1fd703e8
commit
9cd65c46a7
|
@ -425,14 +425,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
|
|||
info("Flushing producer.")
|
||||
producer.flush()
|
||||
info("Committing consumer offsets.")
|
||||
try {
|
||||
commitOffsets(mirrorMakerConsumer)
|
||||
} catch {
|
||||
case e: WakeupException => // just ignore
|
||||
}
|
||||
CoreUtils.swallow(commitOffsets(mirrorMakerConsumer))
|
||||
info("Shutting down consumer connectors.")
|
||||
// we do not need to call consumer.close() since the consumer has already been interrupted
|
||||
mirrorMakerConsumer.cleanup()
|
||||
CoreUtils.swallow(mirrorMakerConsumer.stop())
|
||||
CoreUtils.swallow(mirrorMakerConsumer.cleanup())
|
||||
shutdownLatch.countDown()
|
||||
info("Mirror maker thread stopped")
|
||||
// if it exits accidentally, stop the entire mirror maker
|
||||
|
@ -572,7 +568,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
|
|||
}
|
||||
|
||||
override def cleanup() {
|
||||
ClientUtils.swallow(consumer.close())
|
||||
consumer.close()
|
||||
}
|
||||
|
||||
override def commit() {
|
||||
|
|
Loading…
Reference in New Issue