MirrorMaker now can run as multi-node service. Added kill -9 to various clean_node methods.

This commit is contained in:
Geoff Anderson 2015-08-21 16:51:13 -07:00
parent 1e806f2a4c
commit c7c3ebdf2f
6 changed files with 52 additions and 27 deletions

View File

@ -43,12 +43,12 @@ class TestMirrorMakerService(Test):
max_messages=self.num_messages, throughput=1000)
# Use a regex whitelist to check that the start command is well-formed in this case
self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka,
whitelist=".*", consumer_timeout_ms=10000)
self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
whitelist=".*", consumer_timeout_ms=2000)
# This will consume from target kafka cluster
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
consumer_timeout_ms=10000)
consumer_timeout_ms=1000)
def setUp(self):
# Source cluster
@ -64,7 +64,7 @@ class TestMirrorMakerService(Test):
Test end-to-end behavior under non-failure conditions.
Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
One is source, and the other is target.
One is source, and the other is target. Single-node mirror maker mirrors from source to target.
- Start mirror maker.
- Produce a small number of messages to the source cluster.
@ -73,7 +73,7 @@ class TestMirrorMakerService(Test):
"""
self.mirror_maker.start()
# Check that consumer_timeout_ms setting made it to config file
self.mirror_maker.node.account.ssh(
self.mirror_maker.nodes[0].account.ssh(
"grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
self.producer.start()
@ -82,7 +82,8 @@ class TestMirrorMakerService(Test):
self.consumer.wait()
num_consumed = len(self.consumer.messages_consumed[1])
num_produced = self.num_messages
num_produced = self.producer.num_acked
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
self.mirror_maker.stop()

View File

@ -186,5 +186,9 @@ class ConsoleConsumer(BackgroundThreadService):
err_msg="Timed out waiting for consumer to stop.")
def clean_node(self, node):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)

View File

@ -93,6 +93,7 @@ class KafkaService(Service):
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
def clean_node(self, node):
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
def create_topic(self, topic_cfg):

View File

@ -71,7 +71,7 @@ class MirrorMaker(Service):
"collect_default": True}
}
def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
"""
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
@ -81,27 +81,34 @@ class MirrorMaker(Service):
target: target Kafka cluster to which data will be mirrored
whitelist: whitelist regex for topics to mirror
blacklist: blacklist regex for topics not to mirror
num_streams: number of consumer threads to create
num_streams: number of consumer threads to create; can be a single int, or a list with
one value per node, allowing num_streams to be the same for each node,
or configured independently per-node
consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages
"""
super(MirrorMaker, self).__init__(context, num_nodes=1)
super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
self.consumer_timeout_ms = consumer_timeout_ms
self.num_streams = num_streams
if not isinstance(num_streams, int):
# if not an integer, num_streams should be configured per-node
assert len(num_streams) == num_nodes
self.whitelist = whitelist
self.blacklist = blacklist
self.source = source
self.target = target
@property
def start_cmd(self):
def start_cmd(self, node):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
cmd += " --num.streams %d" % self.num_streams
if isinstance(self.num_streams, int):
cmd += " --num.streams %d" % self.num_streams
else:
# config num_streams separately on each node
cmd += " --num.streams %d" % self.num_streams[self.idx(node) - 1]
if self.whitelist is not None:
cmd += " --whitelist=\"%s\"" % self.whitelist
if self.blacklist is not None:
@ -109,14 +116,6 @@ class MirrorMaker(Service):
cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
return cmd
@property
def node(self):
"""Convenience method since this Service only ever has one node"""
if self.nodes:
return self.nodes[0]
else:
return None
def pids(self, node):
try:
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
@ -125,8 +124,8 @@ class MirrorMaker(Service):
except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self):
return len(self.pids(self.node)) > 0
def alive(self, node):
return len(self.pids(node)) > 0
def start_node(self, node):
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
@ -146,18 +145,21 @@ class MirrorMaker(Service):
node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
# Run mirror maker
cmd = self.start_cmd
cmd = self.start_cmd(node)
self.logger.debug("Mirror maker command: %s", cmd)
node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5,
wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to start.")
self.logger.debug("Mirror maker is alive")
def stop_node(self, node):
node.account.kill_process("java", allow_fail=True)
wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5,
wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
def clean_node(self, node):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)

View File

@ -98,6 +98,7 @@ class VerifiableProducer(BackgroundThreadService):
self.worker_threads[self.idx(node) - 1].join()
def clean_node(self, node):
node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
def try_parse_json(self, string):

View File

@ -16,6 +16,7 @@
from ducktape.services.service import Service
import subprocess
import time
@ -51,6 +52,17 @@ class ZookeeperService(Service):
time.sleep(5) # give it some time to start
def pids(self, node):
try:
cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self, node):
return len(self.pids(node)) > 0
def stop_node(self, node):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
@ -58,6 +70,10 @@ class ZookeeperService(Service):
def clean_node(self, node):
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
def connect_setting(self):