KAFKA-3597; Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly

Even if a test calls stop() on console_consumer or verifiable_producer, it is still possible that producer/consumer will not shutdown cleanly, and will be killed forcefully after a timeout. It will be useful for some tests to know whether a clean shutdown happened or not. This PR adds methods to console_consumer and verifiable_producer to query whether clean shutdown happened or not.

hachikuji and/or granders Please review.

Author: Anna Povzner <anna@confluent.io>

Reviewers: Jason Gustafson, Geoff Anderson, Gwen Shapira

Closes #1278 from apovzner/kafka-3597
This commit is contained in:
Anna Povzner 2016-04-29 10:51:29 -07:00 committed by Gwen Shapira
parent eb50d2f6ca
commit e29eac4bbb
4 changed files with 33 additions and 4 deletions

View File

@ -103,6 +103,10 @@ object ConsoleConsumer extends Logging {
consumer.stop()
shutdownLatch.await()
if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}
@ -253,6 +257,9 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
@ -260,6 +267,7 @@ object ConsoleConsumer extends Logging {
var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
val useNewConsumer = options.has(useNewConsumerOpt)
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.

View File

@ -123,6 +123,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.clean_shutdown_nodes = set()
self.client_id = client_id
self.print_key = print_key
self.log_level = "TRACE"
@ -185,6 +186,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
if node.version > LATEST_0_9:
cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
cmd += " --enable-systest-events"
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd
@ -226,6 +228,11 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
for line in itertools.chain([first_line], consumer_output):
msg = line.strip()
if msg == "shutdown_complete":
if node in self.clean_shutdown_nodes:
raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
self.clean_shutdown_nodes.add(node)
else:
if self.message_validator is not None:
msg = self.message_validator(msg)
if msg is not None:

View File

@ -71,6 +71,7 @@ class VerifiableProducer(BackgroundThreadService):
self.acked_values = []
self.not_acked_values = []
self.produced_count = {}
self.clean_shutdown_nodes = set()
self.acks = acks
@ -139,6 +140,11 @@ class VerifiableProducer(BackgroundThreadService):
last_produced_time = t
prev_msg = data
elif data["name"] == "shutdown_complete":
if node in self.clean_shutdown_nodes:
raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
self.clean_shutdown_nodes.add(node)
def start_cmd(self, node, idx):
cmd = ""

View File

@ -247,6 +247,14 @@ public class VerifiableProducer {
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
System.out.println(shutdownString());
}
String shutdownString() {
Map<String, Object> data = new HashMap<>();
data.put("class", this.getClass().toString());
data.put("name", "shutdown_complete");
return toJsonString(data);
}
/**