KAFKA-3080: Fix ConsoleConsumerTest by checking version when service is started

The MessageFormatter being used was only introduced as of 0.9.0.0. The Kafka
version in some tests is changed dynamically, sometimes from trunk back to an
earlier version, so this option must be set based on the version used when the
service is started, not when it is created.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Geoff Anderson, Ismael Juma, Grant Henke

Closes #770 from ewencp/kafka-3080-system-test-console-consumer-version-failure
This commit is contained in:
Ewen Cheslack-Postava 2016-01-21 11:18:24 -08:00 committed by Gwen Shapira
parent 79cda0472b
commit 9577dc2358
1 changed files with 2 additions and 2 deletions

View File

@ -135,7 +135,6 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.client_id = client_id
self.print_key = print_key
self.log_values = True if version == TRUNK else False
self.log_level = "TRACE"
def prop_file(self, node):
@ -192,7 +191,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
if self.print_key:
cmd += " --property print.key=true"
if self.log_values:
# LoggingMessageFormatter was introduced in 0.9.0.0
if node.version > LATEST_0_8_2:
cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args