HOTFIX: Tools for releases prior to 0.10.1 need --new-consumer flag

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1931 from hachikuji/fix-broken-upgrade-tests
This commit is contained in:
Jason Gustafson 2016-09-29 07:48:40 +01:00 committed by Ismael Juma
parent 67e99d0869
commit a6f3cf56b3
2 changed files with 8 additions and 5 deletions

View File

@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, V_0_10_0_0 from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
""" """
0.8.2.1 ConsoleConsumer options 0.8.2.1 ConsoleConsumer options
@ -176,6 +176,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
"--topic %(topic)s --consumer.config %(config_file)s" % args "--topic %(topic)s --consumer.config %(config_file)s" % args
if self.new_consumer: if self.new_consumer:
if node.version <= LATEST_0_10_0:
cmd += " --new-consumer"
cmd += " --bootstrap-server %(broker_list)s" % args cmd += " --bootstrap-server %(broker_list)s" % args
else: else:
cmd += " --zookeeper %(zk_connect)s" % args cmd += " --zookeeper %(zk_connect)s" % args

View File

@ -18,7 +18,7 @@ import os
from kafkatest.services.performance import PerformanceService from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import TRUNK, V_0_9_0_0 from kafkatest.version import TRUNK, V_0_9_0_0, LATEST_0_10_0
class ConsumerPerformanceService(PerformanceService): class ConsumerPerformanceService(PerformanceService):
@ -97,8 +97,7 @@ class ConsumerPerformanceService(PerformanceService):
for node in self.nodes: for node in self.nodes:
node.version = version node.version = version
@property def args(self, version):
def args(self):
"""Dictionary of arguments used to start the Consumer Performance script.""" """Dictionary of arguments used to start the Consumer Performance script."""
args = { args = {
'topic': self.topic, 'topic': self.topic,
@ -106,6 +105,8 @@ class ConsumerPerformanceService(PerformanceService):
} }
if self.new_consumer: if self.new_consumer:
if version <= LATEST_0_10_0:
args['new-consumer'] = ""
args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else: else:
args['zookeeper'] = self.kafka.zk.connect_setting() args['zookeeper'] = self.kafka.zk.connect_setting()
@ -135,7 +136,7 @@ class ConsumerPerformanceService(PerformanceService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
cmd += " %s" % self.path.script("kafka-consumer-perf-test.sh", node) cmd += " %s" % self.path.script("kafka-consumer-perf-test.sh", node)
for key, value in self.args.items(): for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value) cmd += " --%s %s" % (key, value)
if node.version >= V_0_9_0_0: if node.version >= V_0_9_0_0: