mirror of https://github.com/apache/kafka.git
MirrorMaker now can run as multi-node service. Added kill -9 to various clean_node methods.
This commit is contained in:
parent
1e806f2a4c
commit
c7c3ebdf2f
|
@ -43,12 +43,12 @@ class TestMirrorMakerService(Test):
|
|||
max_messages=self.num_messages, throughput=1000)
|
||||
|
||||
# Use a regex whitelist to check that the start command is well-formed in this case
|
||||
self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka,
|
||||
whitelist=".*", consumer_timeout_ms=10000)
|
||||
self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
|
||||
whitelist=".*", consumer_timeout_ms=2000)
|
||||
|
||||
# This will consume from target kafka cluster
|
||||
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
|
||||
consumer_timeout_ms=10000)
|
||||
consumer_timeout_ms=1000)
|
||||
|
||||
def setUp(self):
|
||||
# Source cluster
|
||||
|
@ -64,7 +64,7 @@ class TestMirrorMakerService(Test):
|
|||
Test end-to-end behavior under non-failure conditions.
|
||||
|
||||
Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
|
||||
One is source, and the other is target.
|
||||
One is source, and the other is target. Single-node mirror maker mirrors from source to target.
|
||||
|
||||
- Start mirror maker.
|
||||
- Produce a small number of messages to the source cluster.
|
||||
|
@ -73,7 +73,7 @@ class TestMirrorMakerService(Test):
|
|||
"""
|
||||
self.mirror_maker.start()
|
||||
# Check that consumer_timeout_ms setting made it to config file
|
||||
self.mirror_maker.node.account.ssh(
|
||||
self.mirror_maker.nodes[0].account.ssh(
|
||||
"grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
|
||||
|
||||
self.producer.start()
|
||||
|
@ -82,7 +82,8 @@ class TestMirrorMakerService(Test):
|
|||
self.consumer.wait()
|
||||
|
||||
num_consumed = len(self.consumer.messages_consumed[1])
|
||||
num_produced = self.num_messages
|
||||
num_produced = self.producer.num_acked
|
||||
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
|
||||
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
|
||||
|
||||
self.mirror_maker.stop()
|
||||
|
|
|
@ -186,5 +186,9 @@ class ConsoleConsumer(BackgroundThreadService):
|
|||
err_msg="Timed out waiting for consumer to stop.")
|
||||
|
||||
def clean_node(self, node):
|
||||
if self.alive(node):
|
||||
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
|
||||
(self.__class__.__name__, node.account))
|
||||
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
|
||||
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
|
|
|
@ -93,6 +93,7 @@ class KafkaService(Service):
|
|||
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
|
||||
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def create_topic(self, topic_cfg):
|
||||
|
|
|
@ -71,7 +71,7 @@ class MirrorMaker(Service):
|
|||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
|
||||
def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
|
||||
"""
|
||||
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
|
||||
|
||||
|
@ -81,27 +81,34 @@ class MirrorMaker(Service):
|
|||
target: target Kafka cluster to which data will be mirrored
|
||||
whitelist: whitelist regex for topics to mirror
|
||||
blacklist: blacklist regex for topics not to mirror
|
||||
num_streams: number of consumer threads to create
|
||||
num_streams: number of consumer threads to create; can be a single int, or a list with
|
||||
one value per node, allowing num_streams to be the same for each node,
|
||||
or configured independently per-node
|
||||
consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages
|
||||
"""
|
||||
super(MirrorMaker, self).__init__(context, num_nodes=1)
|
||||
super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
|
||||
|
||||
self.consumer_timeout_ms = consumer_timeout_ms
|
||||
self.num_streams = num_streams
|
||||
if not isinstance(num_streams, int):
|
||||
# if not an integer, num_streams should be configured per-node
|
||||
assert len(num_streams) == num_nodes
|
||||
self.whitelist = whitelist
|
||||
self.blacklist = blacklist
|
||||
self.source = source
|
||||
self.target = target
|
||||
|
||||
@property
|
||||
def start_cmd(self):
|
||||
def start_cmd(self, node):
|
||||
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
|
||||
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
|
||||
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
|
||||
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
|
||||
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
|
||||
cmd += " --num.streams %d" % self.num_streams
|
||||
|
||||
if isinstance(self.num_streams, int):
|
||||
cmd += " --num.streams %d" % self.num_streams
|
||||
else:
|
||||
# config num_streams separately on each node
|
||||
cmd += " --num.streams %d" % self.num_streams[self.idx(node) - 1]
|
||||
if self.whitelist is not None:
|
||||
cmd += " --whitelist=\"%s\"" % self.whitelist
|
||||
if self.blacklist is not None:
|
||||
|
@ -109,14 +116,6 @@ class MirrorMaker(Service):
|
|||
cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
|
||||
return cmd
|
||||
|
||||
@property
|
||||
def node(self):
|
||||
"""Convenience method since this Service only ever has one node"""
|
||||
if self.nodes:
|
||||
return self.nodes[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def pids(self, node):
|
||||
try:
|
||||
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
|
||||
|
@ -125,8 +124,8 @@ class MirrorMaker(Service):
|
|||
except (subprocess.CalledProcessError, ValueError) as e:
|
||||
return []
|
||||
|
||||
def alive(self):
|
||||
return len(self.pids(self.node)) > 0
|
||||
def alive(self, node):
|
||||
return len(self.pids(node)) > 0
|
||||
|
||||
def start_node(self, node):
|
||||
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
@ -146,18 +145,21 @@ class MirrorMaker(Service):
|
|||
node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
|
||||
|
||||
# Run mirror maker
|
||||
cmd = self.start_cmd
|
||||
cmd = self.start_cmd(node)
|
||||
self.logger.debug("Mirror maker command: %s", cmd)
|
||||
node.account.ssh(cmd, allow_fail=False)
|
||||
wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5,
|
||||
wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
|
||||
err_msg="Mirror maker took to long to start.")
|
||||
self.logger.debug("Mirror maker is alive")
|
||||
|
||||
def stop_node(self, node):
|
||||
node.account.kill_process("java", allow_fail=True)
|
||||
wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5,
|
||||
wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
|
||||
err_msg="Mirror maker took to long to stop.")
|
||||
|
||||
def clean_node(self, node):
|
||||
if self.alive(node):
|
||||
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
|
||||
(self.__class__.__name__, node.account))
|
||||
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
|
||||
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
|
|
|
@ -98,6 +98,7 @@ class VerifiableProducer(BackgroundThreadService):
|
|||
self.worker_threads[self.idx(node) - 1].join()
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
|
||||
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
|
||||
|
||||
def try_parse_json(self, string):
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
|
||||
|
@ -51,6 +52,17 @@ class ZookeeperService(Service):
|
|||
|
||||
time.sleep(5) # give it some time to start
|
||||
|
||||
def pids(self, node):
|
||||
try:
|
||||
cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
|
||||
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
|
||||
return pid_arr
|
||||
except (subprocess.CalledProcessError, ValueError) as e:
|
||||
return []
|
||||
|
||||
def alive(self, node):
|
||||
return len(self.pids(node)) > 0
|
||||
|
||||
def stop_node(self, node):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||
|
@ -58,6 +70,10 @@ class ZookeeperService(Service):
|
|||
|
||||
def clean_node(self, node):
|
||||
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
|
||||
if self.alive(node):
|
||||
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
|
||||
(self.__class__.__name__, node.account))
|
||||
node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True)
|
||||
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
|
||||
|
||||
def connect_setting(self):
|
||||
|
|
Loading…
Reference in New Issue