Various cleanups per review.

This commit is contained in:
Geoff Anderson 2015-08-20 13:42:04 -07:00
parent 1b4b04935e
commit 1e806f2a4c
6 changed files with 67 additions and 84 deletions

View File

@ -15,7 +15,6 @@
from ducktape.tests.test import Test from ducktape.tests.test import Test
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
@ -75,8 +74,7 @@ class ConsoleConsumerTest(Test):
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node) self.consumer.stop_node(node)
wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2,
err_msg="Timed out waiting for consumer to stop.")

View File

@ -15,7 +15,6 @@
from ducktape.tests.test import Test from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
@ -25,50 +24,40 @@ from kafkatest.services.mirror_maker import MirrorMaker
class TestMirrorMakerService(Test): class TestMirrorMakerService(Test):
"""Sanity checks on console consumer service class.""" """Sanity checks on mirror maker service class."""
def __init__(self, test_context): def __init__(self, test_context):
super(TestMirrorMakerService, self).__init__(test_context) super(TestMirrorMakerService, self).__init__(test_context)
self.topic = "topic" self.topic = "topic"
self.zk1 = ZookeeperService(test_context, num_nodes=1) self.source_zk = ZookeeperService(test_context, num_nodes=1)
self.zk2 = ZookeeperService(test_context, num_nodes=1) self.target_zk = ZookeeperService(test_context, num_nodes=1)
self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1, self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}}) topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2, self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}}) topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.num_messages = 1000 self.num_messages = 1000
# This will produce to source kafka cluster # This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic, self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
max_messages=self.num_messages, throughput=1000) max_messages=self.num_messages, throughput=1000)
self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic)
# Use a regex whitelist to check that the start command is well-formed in this case
self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka,
whitelist=".*", consumer_timeout_ms=10000)
# This will consume from target kafka cluster # This will consume from target kafka cluster
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic, self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
consumer_timeout_ms=10000) consumer_timeout_ms=10000)
def setUp(self): def setUp(self):
# Source cluster # Source cluster
self.zk1.start() self.source_zk.start()
self.k1.start() self.source_kafka.start()
# Target cluster # Target cluster
self.zk2.start() self.target_zk.start()
self.k2.start() self.target_kafka.start()
def test_lifecycle(self):
"""Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a
reasonable amount of time.
"""
self.mirror_maker.start()
node = self.mirror_maker.nodes[0]
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took too long to start.")
self.mirror_maker.stop()
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
def test_end_to_end(self): def test_end_to_end(self):
""" """
@ -80,19 +69,21 @@ class TestMirrorMakerService(Test):
- Start mirror maker. - Start mirror maker.
- Produce a small number of messages to the source cluster. - Produce a small number of messages to the source cluster.
- Consume messages from target. - Consume messages from target.
- Confirm that number of consumed messages matches the number produced. - Verify that number of consumed messages matches the number produced.
""" """
self.mirror_maker.start() self.mirror_maker.start()
node = self.mirror_maker.nodes[0] # Check that consumer_timeout_ms setting made it to config file
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, self.mirror_maker.node.account.ssh(
err_msg="Mirror maker took too long to start.") "grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
self.producer.start() self.producer.start()
self.producer.wait() self.producer.wait()
self.consumer.start() self.consumer.start()
self.consumer.wait() self.consumer.wait()
assert len(self.consumer.messages_consumed[1]) == self.num_messages
num_consumed = len(self.consumer.messages_consumed[1])
num_produced = self.num_messages
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
self.mirror_maker.stop() self.mirror_maker.stop()
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")

View File

@ -14,8 +14,10 @@
# limitations under the License. # limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService from ducktape.services.background_thread import BackgroundThreadService
from ducktape.utils.util import wait_until
import os import os
import subprocess
def is_int(msg): def is_int(msg):
@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService):
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'" cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr return pid_arr
except: except (subprocess.CalledProcessError, ValueError) as e:
return [] return []
def alive(self, node): def alive(self, node):
@ -180,6 +182,8 @@ class ConsoleConsumer(BackgroundThreadService):
def stop_node(self, node): def stop_node(self, node):
node.account.kill_process("java", allow_fail=True) node.account.kill_process("java", allow_fail=True)
wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
err_msg="Timed out waiting for consumer to stop.")
def clean_node(self, node): def clean_node(self, node):
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)

View File

@ -14,9 +14,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService from ducktape.services.service import Service
from ducktape.utils.util import wait_until
import os import os
import subprocess
""" """
0.8.2.1 MirrorMaker options 0.8.2.1 MirrorMaker options
@ -51,27 +53,8 @@ Option Description
--whitelist <Java regex (String)> Whitelist of topics to mirror. --whitelist <Java regex (String)> Whitelist of topics to mirror.
""" """
"""
consumer config
zookeeper.connect={{ zookeeper_connect }} class MirrorMaker(Service):
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
group.id={{ group_id|default('test-consumer-group') }}
{% if consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}
"""
"""
producer config
metadata.broker.list={{ metadata_broker_list }}
producer.type={{ producer_type }} # sync or async
"""
class MirrorMaker(BackgroundThreadService):
# Root directory for persistent output # Root directory for persistent output
PERSISTENT_ROOT = "/mnt/mirror_maker" PERSISTENT_ROOT = "/mnt/mirror_maker"
@ -79,6 +62,7 @@ class MirrorMaker(BackgroundThreadService):
LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log") LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties") PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
KAFKA_HOME = "/opt/kafka/" KAFKA_HOME = "/opt/kafka/"
logs = { logs = {
@ -87,14 +71,14 @@ class MirrorMaker(BackgroundThreadService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None): def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
""" """
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster. MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
Args: Args:
context: standard context context: standard context
sources: list of one or more source kafka clusters source: source Kafka cluster
target: target cluster target: target Kafka cluster to which data will be mirrored
whitelist: whitelist regex for topics to mirror whitelist: whitelist regex for topics to mirror
blacklist: blacklist regex for topics not to mirror blacklist: blacklist regex for topics not to mirror
num_streams: number of consumer threads to create num_streams: number of consumer threads to create
@ -106,50 +90,51 @@ class MirrorMaker(BackgroundThreadService):
self.num_streams = num_streams self.num_streams = num_streams
self.whitelist = whitelist self.whitelist = whitelist
self.blacklist = blacklist self.blacklist = blacklist
self.sources = sources self.source = source
self.target = target self.target = target
def consumer_config_file(self, kafka):
return "consumer%s.properties" % kafka.zk.connect_setting()
@property @property
def start_cmd(self): def start_cmd(self):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
for kafka in self.sources: cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
# One consumer config file per source kafka cluster
cmd += " --consumer.config %s" % self.consumer_config_file(kafka)
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
cmd += " --num.streams %d" % self.num_streams cmd += " --num.streams %d" % self.num_streams
if self.whitelist is not None: if self.whitelist is not None:
cmd += " --whitelist=%s" % self.whitelist cmd += " --whitelist=\"%s\"" % self.whitelist
if self.blacklist is not None: if self.blacklist is not None:
cmd += " --blacklist=%s" % self.blacklist cmd += " --blacklist=\"%s\"" % self.blacklist
cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
return cmd return cmd
@property
def node(self):
"""Convenience method since this Service only ever has one node"""
if self.nodes:
return self.nodes[0]
else:
return None
def pids(self, node): def pids(self, node):
try: try:
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr return pid_arr
except: except (subprocess.CalledProcessError, ValueError) as e:
return [] return []
def alive(self, node): def alive(self):
return len(self.pids(node)) > 0 return len(self.pids(self.node)) > 0
def _worker(self, idx, node): def start_node(self, node):
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False) node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
# Create, upload one consumer config file per source kafka cluster # Create, upload one consumer config file for source cluster
for kafka in self.sources: consumer_props = self.render('consumer.properties', zookeeper_connect=self.source.zk.connect_setting())
consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(), node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
consumer_timeout_ms=self.consumer_timeout_ms)
node.account.create_file(self.consumer_config_file(kafka), consumer_props)
# Create, upload producer properties file for target cluster # Create, upload producer properties file for target cluster
producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(), producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
@ -162,11 +147,16 @@ class MirrorMaker(BackgroundThreadService):
# Run mirror maker # Run mirror maker
cmd = self.start_cmd cmd = self.start_cmd
self.logger.debug("Mirror maker %d command: %s", idx, cmd) self.logger.debug("Mirror maker command: %s", cmd)
node.account.ssh(cmd, allow_fail=False) node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to start.")
self.logger.debug("Mirror maker is alive")
def stop_node(self, node): def stop_node(self, node):
node.account.kill_process("java", allow_fail=True) node.account.kill_process("java", allow_fail=True)
wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
def clean_node(self, node): def clean_node(self, node):
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)

View File

@ -14,6 +14,6 @@
# limitations under the License. # limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults # see kafka.server.KafkaConfig for additional details and defaults
{% if consumer_timeout_ms is not none %} {% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }} consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %} {% endif %}

View File

@ -18,6 +18,6 @@ zookeeper.connect={{ zookeeper_connect }}
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }} zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
group.id={{ group_id|default('test-consumer-group') }} group.id={{ group_id|default('test-consumer-group') }}
{% if consumer_timeout_ms is not none %} {% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }} consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %} {% endif %}