Various cleanups per review.

This commit is contained in:
Geoff Anderson 2015-08-20 13:42:04 -07:00
parent 1b4b04935e
commit 1e806f2a4c
6 changed files with 67 additions and 84 deletions

View File

@ -15,7 +15,6 @@
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -75,8 +74,7 @@ class ConsoleConsumerTest(Test):
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node)
wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2,
err_msg="Timed out waiting for consumer to stop.")

View File

@ -15,7 +15,6 @@
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -25,50 +24,40 @@ from kafkatest.services.mirror_maker import MirrorMaker
class TestMirrorMakerService(Test):
"""Sanity checks on console consumer service class."""
"""Sanity checks on mirror maker service class."""
def __init__(self, test_context):
super(TestMirrorMakerService, self).__init__(test_context)
self.topic = "topic"
self.zk1 = ZookeeperService(test_context, num_nodes=1)
self.zk2 = ZookeeperService(test_context, num_nodes=1)
self.source_zk = ZookeeperService(test_context, num_nodes=1)
self.target_zk = ZookeeperService(test_context, num_nodes=1)
self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1,
self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2,
self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.num_messages = 1000
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic,
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
max_messages=self.num_messages, throughput=1000)
self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic)
# Use a regex whitelist to check that the start command is well-formed in this case
self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka,
whitelist=".*", consumer_timeout_ms=10000)
# This will consume from target kafka cluster
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic,
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
consumer_timeout_ms=10000)
def setUp(self):
# Source cluster
self.zk1.start()
self.k1.start()
self.source_zk.start()
self.source_kafka.start()
# Target cluster
self.zk2.start()
self.k2.start()
def test_lifecycle(self):
"""Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a
reasonable amount of time.
"""
self.mirror_maker.start()
node = self.mirror_maker.nodes[0]
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took too long to start.")
self.mirror_maker.stop()
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
self.target_zk.start()
self.target_kafka.start()
def test_end_to_end(self):
"""
@ -80,19 +69,21 @@ class TestMirrorMakerService(Test):
- Start mirror maker.
- Produce a small number of messages to the source cluster.
- Consume messages from target.
- Confirm that number of consumed messages matches the number produced.
- Verify that number of consumed messages matches the number produced.
"""
self.mirror_maker.start()
node = self.mirror_maker.nodes[0]
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took too long to start.")
# Check that consumer_timeout_ms setting made it to config file
self.mirror_maker.node.account.ssh(
"grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
self.producer.start()
self.producer.wait()
self.consumer.start()
self.consumer.wait()
assert len(self.consumer.messages_consumed[1]) == self.num_messages
num_consumed = len(self.consumer.messages_consumed[1])
num_produced = self.num_messages
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
self.mirror_maker.stop()
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")

View File

@ -14,8 +14,10 @@
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.utils.util import wait_until
import os
import subprocess
def is_int(msg):
@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService):
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except:
except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self, node):
@ -180,6 +182,8 @@ class ConsoleConsumer(BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("java", allow_fail=True)
wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
err_msg="Timed out waiting for consumer to stop.")
def clean_node(self, node):
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)

View File

@ -14,9 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
import os
import subprocess
"""
0.8.2.1 MirrorMaker options
@ -51,27 +53,8 @@ Option Description
--whitelist <Java regex (String)> Whitelist of topics to mirror.
"""
"""
consumer config
zookeeper.connect={{ zookeeper_connect }}
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
group.id={{ group_id|default('test-consumer-group') }}
{% if consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}
"""
"""
producer config
metadata.broker.list={{ metadata_broker_list }}
producer.type={{ producer_type }} # sync or async
"""
class MirrorMaker(BackgroundThreadService):
class MirrorMaker(Service):
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/mirror_maker"
@ -79,6 +62,7 @@ class MirrorMaker(BackgroundThreadService):
LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
KAFKA_HOME = "/opt/kafka/"
logs = {
@ -87,14 +71,14 @@ class MirrorMaker(BackgroundThreadService):
"collect_default": True}
}
def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
"""
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
Args:
context: standard context
sources: list of one or more source kafka clusters
target: target cluster
source: source Kafka cluster
target: target Kafka cluster to which data will be mirrored
whitelist: whitelist regex for topics to mirror
blacklist: blacklist regex for topics not to mirror
num_streams: number of consumer threads to create
@ -106,50 +90,51 @@ class MirrorMaker(BackgroundThreadService):
self.num_streams = num_streams
self.whitelist = whitelist
self.blacklist = blacklist
self.sources = sources
self.source = source
self.target = target
def consumer_config_file(self, kafka):
return "consumer%s.properties" % kafka.zk.connect_setting()
@property
def start_cmd(self):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
for kafka in self.sources:
# One consumer config file per source kafka cluster
cmd += " --consumer.config %s" % self.consumer_config_file(kafka)
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
cmd += " --num.streams %d" % self.num_streams
if self.whitelist is not None:
cmd += " --whitelist=%s" % self.whitelist
cmd += " --whitelist=\"%s\"" % self.whitelist
if self.blacklist is not None:
cmd += " --blacklist=%s" % self.blacklist
cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE
cmd += " --blacklist=\"%s\"" % self.blacklist
cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
return cmd
@property
def node(self):
"""Convenience method since this Service only ever has one node"""
if self.nodes:
return self.nodes[0]
else:
return None
def pids(self, node):
try:
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except:
except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self, node):
return len(self.pids(node)) > 0
def alive(self):
return len(self.pids(self.node)) > 0
def _worker(self, idx, node):
def start_node(self, node):
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
# Create, upload one consumer config file per source kafka cluster
for kafka in self.sources:
consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(),
consumer_timeout_ms=self.consumer_timeout_ms)
node.account.create_file(self.consumer_config_file(kafka), consumer_props)
# Create, upload one consumer config file for source cluster
consumer_props = self.render('consumer.properties', zookeeper_connect=self.source.zk.connect_setting())
node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
# Create, upload producer properties file for target cluster
producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
@ -162,11 +147,16 @@ class MirrorMaker(BackgroundThreadService):
# Run mirror maker
cmd = self.start_cmd
self.logger.debug("Mirror maker %d command: %s", idx, cmd)
self.logger.debug("Mirror maker command: %s", cmd)
node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to start.")
self.logger.debug("Mirror maker is alive")
def stop_node(self, node):
node.account.kill_process("java", allow_fail=True)
wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
def clean_node(self, node):
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)

View File

@ -14,6 +14,6 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
{% if consumer_timeout_ms is not none %}
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}
{% endif %}

View File

@ -18,6 +18,6 @@ zookeeper.connect={{ zookeeper_connect }}
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
group.id={{ group_id|default('test-consumer-group') }}
{% if consumer_timeout_ms is not none %}
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}