MirrorMaker now can run as multi-node service. Added kill -9 to various clean_node methods.

This commit is contained in:
Geoff Anderson 2015-08-21 16:51:13 -07:00
parent 1e806f2a4c
commit c7c3ebdf2f
6 changed files with 52 additions and 27 deletions

View File

@ -43,12 +43,12 @@ class TestMirrorMakerService(Test):
max_messages=self.num_messages, throughput=1000) max_messages=self.num_messages, throughput=1000)
# Use a regex whitelist to check that the start command is well-formed in this case # Use a regex whitelist to check that the start command is well-formed in this case
self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka, self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
whitelist=".*", consumer_timeout_ms=10000) whitelist=".*", consumer_timeout_ms=2000)
# This will consume from target kafka cluster # This will consume from target kafka cluster
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic, self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
consumer_timeout_ms=10000) consumer_timeout_ms=1000)
def setUp(self): def setUp(self):
# Source cluster # Source cluster
@ -64,7 +64,7 @@ class TestMirrorMakerService(Test):
Test end-to-end behavior under non-failure conditions. Test end-to-end behavior under non-failure conditions.
Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster. Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
One is source, and the other is target. One is source, and the other is target. Single-node mirror maker mirrors from source to target.
- Start mirror maker. - Start mirror maker.
- Produce a small number of messages to the source cluster. - Produce a small number of messages to the source cluster.
@ -73,7 +73,7 @@ class TestMirrorMakerService(Test):
""" """
self.mirror_maker.start() self.mirror_maker.start()
# Check that consumer_timeout_ms setting made it to config file # Check that consumer_timeout_ms setting made it to config file
self.mirror_maker.node.account.ssh( self.mirror_maker.nodes[0].account.ssh(
"grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False) "grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
self.producer.start() self.producer.start()
@ -82,7 +82,8 @@ class TestMirrorMakerService(Test):
self.consumer.wait() self.consumer.wait()
num_consumed = len(self.consumer.messages_consumed[1]) num_consumed = len(self.consumer.messages_consumed[1])
num_produced = self.num_messages num_produced = self.producer.num_acked
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed) assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
self.mirror_maker.stop() self.mirror_maker.stop()

View File

@ -186,5 +186,9 @@ class ConsoleConsumer(BackgroundThreadService):
err_msg="Timed out waiting for consumer to stop.") err_msg="Timed out waiting for consumer to stop.")
def clean_node(self, node): def clean_node(self, node):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)

View File

@ -93,6 +93,7 @@ class KafkaService(Service):
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False) node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
def clean_node(self, node): def clean_node(self, node):
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
def create_topic(self, topic_cfg): def create_topic(self, topic_cfg):

View File

@ -71,7 +71,7 @@ class MirrorMaker(Service):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None): def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
""" """
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster. MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
@ -81,27 +81,34 @@ class MirrorMaker(Service):
target: target Kafka cluster to which data will be mirrored target: target Kafka cluster to which data will be mirrored
whitelist: whitelist regex for topics to mirror whitelist: whitelist regex for topics to mirror
blacklist: blacklist regex for topics not to mirror blacklist: blacklist regex for topics not to mirror
num_streams: number of consumer threads to create num_streams: number of consumer threads to create; can be a single int, or a list with
one value per node, allowing num_streams to be the same for each node,
or configured independently per-node
consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages
""" """
super(MirrorMaker, self).__init__(context, num_nodes=1) super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
self.consumer_timeout_ms = consumer_timeout_ms self.consumer_timeout_ms = consumer_timeout_ms
self.num_streams = num_streams self.num_streams = num_streams
if not isinstance(num_streams, int):
# if not an integer, num_streams should be configured per-node
assert len(num_streams) == num_nodes
self.whitelist = whitelist self.whitelist = whitelist
self.blacklist = blacklist self.blacklist = blacklist
self.source = source self.source = source
self.target = target self.target = target
@property def start_cmd(self, node):
def start_cmd(self):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
cmd += " --num.streams %d" % self.num_streams if isinstance(self.num_streams, int):
cmd += " --num.streams %d" % self.num_streams
else:
# config num_streams separately on each node
cmd += " --num.streams %d" % self.num_streams[self.idx(node) - 1]
if self.whitelist is not None: if self.whitelist is not None:
cmd += " --whitelist=\"%s\"" % self.whitelist cmd += " --whitelist=\"%s\"" % self.whitelist
if self.blacklist is not None: if self.blacklist is not None:
@ -109,14 +116,6 @@ class MirrorMaker(Service):
cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE) cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
return cmd return cmd
@property
def node(self):
"""Convenience method since this Service only ever has one node"""
if self.nodes:
return self.nodes[0]
else:
return None
def pids(self, node): def pids(self, node):
try: try:
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
@ -125,8 +124,8 @@ class MirrorMaker(Service):
except (subprocess.CalledProcessError, ValueError) as e: except (subprocess.CalledProcessError, ValueError) as e:
return [] return []
def alive(self): def alive(self, node):
return len(self.pids(self.node)) > 0 return len(self.pids(node)) > 0
def start_node(self, node): def start_node(self, node):
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
@ -146,18 +145,21 @@ class MirrorMaker(Service):
node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config) node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
# Run mirror maker # Run mirror maker
cmd = self.start_cmd cmd = self.start_cmd(node)
self.logger.debug("Mirror maker command: %s", cmd) self.logger.debug("Mirror maker command: %s", cmd)
node.account.ssh(cmd, allow_fail=False) node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5, wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to start.") err_msg="Mirror maker took to long to start.")
self.logger.debug("Mirror maker is alive") self.logger.debug("Mirror maker is alive")
def stop_node(self, node): def stop_node(self, node):
node.account.kill_process("java", allow_fail=True) node.account.kill_process("java", allow_fail=True)
wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5, wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.") err_msg="Mirror maker took to long to stop.")
def clean_node(self, node): def clean_node(self, node):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)

View File

@ -98,6 +98,7 @@ class VerifiableProducer(BackgroundThreadService):
self.worker_threads[self.idx(node) - 1].join() self.worker_threads[self.idx(node) - 1].join()
def clean_node(self, node): def clean_node(self, node):
node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False) node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
def try_parse_json(self, string): def try_parse_json(self, string):

View File

@ -16,6 +16,7 @@
from ducktape.services.service import Service from ducktape.services.service import Service
import subprocess
import time import time
@ -51,6 +52,17 @@ class ZookeeperService(Service):
time.sleep(5) # give it some time to start time.sleep(5) # give it some time to start
def pids(self, node):
try:
cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self, node):
return len(self.pids(node)) > 0
def stop_node(self, node): def stop_node(self, node):
idx = self.idx(node) idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
@ -58,6 +70,10 @@ class ZookeeperService(Service):
def clean_node(self, node): def clean_node(self, node):
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname) self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False) node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
def connect_setting(self): def connect_setting(self):