From 1e806f2a4cf5f66e810ca1abc38b12228d3ae797 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Thu, 20 Aug 2015 13:42:04 -0700 Subject: [PATCH] Various cleanups per review. --- .../sanity_checks/test_console_consumer.py | 4 +- .../sanity_checks/test_mirror_maker.py | 57 ++++++-------- tests/kafkatest/services/console_consumer.py | 6 +- tests/kafkatest/services/mirror_maker.py | 78 ++++++++----------- .../templates/console_consumer.properties | 4 +- .../services/templates/consumer.properties | 2 +- 6 files changed, 67 insertions(+), 84 deletions(-) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 370f5c4b8ac..3e523e12157 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -15,7 +15,6 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until -from ducktape.mark import parametrize from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -75,8 +74,7 @@ class ConsoleConsumerTest(Test): assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) - wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, - err_msg="Timed out waiting for consumer to stop.") + diff --git a/tests/kafkatest/sanity_checks/test_mirror_maker.py b/tests/kafkatest/sanity_checks/test_mirror_maker.py index a91a5841af0..332f9fac110 100644 --- a/tests/kafkatest/sanity_checks/test_mirror_maker.py +++ b/tests/kafkatest/sanity_checks/test_mirror_maker.py @@ -15,7 +15,6 @@ from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -25,50 +24,40 @@ from kafkatest.services.mirror_maker import MirrorMaker class TestMirrorMakerService(Test): - """Sanity checks on console consumer service class.""" + """Sanity checks on mirror maker service class.""" def __init__(self, test_context): super(TestMirrorMakerService, self).__init__(test_context) self.topic = "topic" - self.zk1 = ZookeeperService(test_context, num_nodes=1) - self.zk2 = ZookeeperService(test_context, num_nodes=1) + self.source_zk = ZookeeperService(test_context, num_nodes=1) + self.target_zk = ZookeeperService(test_context, num_nodes=1) - self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1, + self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}) - self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2, + self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}) self.num_messages = 1000 # This will produce to source kafka cluster - self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic, + self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic, max_messages=self.num_messages, throughput=1000) - self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic) + + # Use a regex whitelist to check that the start command is well-formed in this case + self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka, + whitelist=".*", consumer_timeout_ms=10000) # This will consume from target kafka cluster - self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic, + self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic, consumer_timeout_ms=10000) def setUp(self): # Source cluster - self.zk1.start() - self.k1.start() + self.source_zk.start() + self.source_kafka.start() # Target cluster - self.zk2.start() - self.k2.start() - - def test_lifecycle(self): - """Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a - reasonable amount of time. - """ - self.mirror_maker.start() - node = self.mirror_maker.nodes[0] - wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, - err_msg="Mirror maker took too long to start.") - - self.mirror_maker.stop() - wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, - err_msg="Mirror maker took to long to stop.") + self.target_zk.start() + self.target_kafka.start() def test_end_to_end(self): """ @@ -80,19 +69,21 @@ class TestMirrorMakerService(Test): - Start mirror maker. - Produce a small number of messages to the source cluster. - Consume messages from target. - - Confirm that number of consumed messages matches the number produced. + - Verify that number of consumed messages matches the number produced. """ self.mirror_maker.start() - node = self.mirror_maker.nodes[0] - wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, - err_msg="Mirror maker took too long to start.") + # Check that consumer_timeout_ms setting made it to config file + self.mirror_maker.node.account.ssh( + "grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False) self.producer.start() self.producer.wait() self.consumer.start() self.consumer.wait() - assert len(self.consumer.messages_consumed[1]) == self.num_messages + + num_consumed = len(self.consumer.messages_consumed[1]) + num_produced = self.num_messages + assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed) self.mirror_maker.stop() - wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, - err_msg="Mirror maker took to long to stop.") + diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index fb300361de5..9d8ce8e781a 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -14,8 +14,10 @@ # limitations under the License. from ducktape.services.background_thread import BackgroundThreadService +from ducktape.utils.util import wait_until import os +import subprocess def is_int(msg): @@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService): cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except: + except (subprocess.CalledProcessError, ValueError) as e: return [] def alive(self, node): @@ -180,6 +182,8 @@ class ConsoleConsumer(BackgroundThreadService): def stop_node(self, node): node.account.kill_process("java", allow_fail=True) + wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2, + err_msg="Timed out waiting for consumer to stop.") def clean_node(self, node): node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py index a6a1448d60c..c2dd2c1b052 100644 --- a/tests/kafkatest/services/mirror_maker.py +++ b/tests/kafkatest/services/mirror_maker.py @@ -14,9 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.services.background_thread import BackgroundThreadService +from ducktape.services.service import Service +from ducktape.utils.util import wait_until import os +import subprocess """ 0.8.2.1 MirrorMaker options @@ -51,27 +53,8 @@ Option Description --whitelist Whitelist of topics to mirror. """ -""" -consumer config -zookeeper.connect={{ zookeeper_connect }} -zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }} -group.id={{ group_id|default('test-consumer-group') }} - -{% if consumer_timeout_ms is not none %} -consumer.timeout.ms={{ consumer_timeout_ms }} -{% endif %} -""" - -""" -producer config - -metadata.broker.list={{ metadata_broker_list }} -producer.type={{ producer_type }} # sync or async -""" - - -class MirrorMaker(BackgroundThreadService): +class MirrorMaker(Service): # Root directory for persistent output PERSISTENT_ROOT = "/mnt/mirror_maker" @@ -79,6 +62,7 @@ class MirrorMaker(BackgroundThreadService): LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties") + CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties") KAFKA_HOME = "/opt/kafka/" logs = { @@ -87,14 +71,14 @@ class MirrorMaker(BackgroundThreadService): "collect_default": True} } - def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None): + def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None): """ MirrorMaker mirrors messages from one or more source clusters to a single destination cluster. Args: context: standard context - sources: list of one or more source kafka clusters - target: target cluster + source: source Kafka cluster + target: target Kafka cluster to which data will be mirrored whitelist: whitelist regex for topics to mirror blacklist: blacklist regex for topics not to mirror num_streams: number of consumer threads to create @@ -106,50 +90,51 @@ class MirrorMaker(BackgroundThreadService): self.num_streams = num_streams self.whitelist = whitelist self.blacklist = blacklist - self.sources = sources + self.source = source self.target = target - def consumer_config_file(self, kafka): - return "consumer%s.properties" % kafka.zk.connect_setting() - @property def start_cmd(self): cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME - for kafka in self.sources: - # One consumer config file per source kafka cluster - cmd += " --consumer.config %s" % self.consumer_config_file(kafka) + cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG cmd += " --num.streams %d" % self.num_streams if self.whitelist is not None: - cmd += " --whitelist=%s" % self.whitelist + cmd += " --whitelist=\"%s\"" % self.whitelist if self.blacklist is not None: - cmd += " --blacklist=%s" % self.blacklist - cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE + cmd += " --blacklist=\"%s\"" % self.blacklist + cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE) return cmd + @property + def node(self): + """Convenience method since this Service only ever has one node""" + if self.nodes: + return self.nodes[0] + else: + return None + def pids(self, node): try: cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except: + except (subprocess.CalledProcessError, ValueError) as e: return [] - def alive(self, node): - return len(self.pids(node)) > 0 + def alive(self): + return len(self.pids(self.node)) > 0 - def _worker(self, idx, node): + def start_node(self, node): node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False) - # Create, upload one consumer config file per source kafka cluster - for kafka in self.sources: - consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(), - consumer_timeout_ms=self.consumer_timeout_ms) - node.account.create_file(self.consumer_config_file(kafka), consumer_props) + # Create, upload one consumer config file for source cluster + consumer_props = self.render('consumer.properties', zookeeper_connect=self.source.zk.connect_setting()) + node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props) # Create, upload producer properties file for target cluster producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(), @@ -162,11 +147,16 @@ class MirrorMaker(BackgroundThreadService): # Run mirror maker cmd = self.start_cmd - self.logger.debug("Mirror maker %d command: %s", idx, cmd) + self.logger.debug("Mirror maker command: %s", cmd) node.account.ssh(cmd, allow_fail=False) + wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took to long to start.") + self.logger.debug("Mirror maker is alive") def stop_node(self, node): node.account.kill_process("java", allow_fail=True) + wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took to long to stop.") def clean_node(self, node): node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties index 944c2c9e811..7143179748d 100644 --- a/tests/kafkatest/services/templates/console_consumer.properties +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -14,6 +14,6 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults -{% if consumer_timeout_ms is not none %} +{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %} consumer.timeout.ms={{ consumer_timeout_ms }} -{% endif %} \ No newline at end of file +{% endif %} diff --git a/tests/kafkatest/services/templates/consumer.properties b/tests/kafkatest/services/templates/consumer.properties index 8ac53b854e1..b8723d14fa1 100644 --- a/tests/kafkatest/services/templates/consumer.properties +++ b/tests/kafkatest/services/templates/consumer.properties @@ -18,6 +18,6 @@ zookeeper.connect={{ zookeeper_connect }} zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }} group.id={{ group_id|default('test-consumer-group') }} -{% if consumer_timeout_ms is not none %} +{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %} consumer.timeout.ms={{ consumer_timeout_ms }} {% endif %}