Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0

This commit is contained in:
Geoff Anderson 2015-08-18 01:15:00 -07:00
parent 786867c2e1
commit 1b4b04935e
11 changed files with 342 additions and 94 deletions

View File

@ -15,6 +15,7 @@
from ducktape.tests.test import Test from ducktape.tests.test import Test
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
@ -61,20 +62,21 @@ class ConsoleConsumerTest(Test):
self.consumer.start() self.consumer.start()
node = self.consumer.nodes[0] node = self.consumer.nodes[0]
if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2): wait_until(lambda: self.consumer.alive(node),
raise Exception("Consumer was too slow to start") timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
self.logger.info("consumer started in %s seconds " % str(time.time() - t0)) self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
# Verify that log output is happening # Verify that log output is happening
if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10): wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10,
raise Exception("Timed out waiting for log file to exist") err_msg="Timed out waiting for logging to start.")
assert line_count(node, ConsoleConsumer.LOG_FILE) > 0 assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
# Verify no consumed messages # Verify no consumed messages
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node) self.consumer.stop_node(node)
if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2): wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2,
raise Exception("Took too long for consumer to die.") err_msg="Timed out waiting for consumer to stop.")

View File

@ -0,0 +1,98 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.mirror_maker import MirrorMaker
class TestMirrorMakerService(Test):
"""Sanity checks on console consumer service class."""
def __init__(self, test_context):
super(TestMirrorMakerService, self).__init__(test_context)
self.topic = "topic"
self.zk1 = ZookeeperService(test_context, num_nodes=1)
self.zk2 = ZookeeperService(test_context, num_nodes=1)
self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.num_messages = 1000
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic,
max_messages=self.num_messages, throughput=1000)
self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic)
# This will consume from target kafka cluster
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic,
consumer_timeout_ms=10000)
def setUp(self):
# Source cluster
self.zk1.start()
self.k1.start()
# Target cluster
self.zk2.start()
self.k2.start()
def test_lifecycle(self):
"""Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a
reasonable amount of time.
"""
self.mirror_maker.start()
node = self.mirror_maker.nodes[0]
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took too long to start.")
self.mirror_maker.stop()
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")
def test_end_to_end(self):
"""
Test end-to-end behavior under non-failure conditions.
Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
One is source, and the other is target.
- Start mirror maker.
- Produce a small number of messages to the source cluster.
- Consume messages from target.
- Confirm that number of consumed messages matches the number produced.
"""
self.mirror_maker.start()
node = self.mirror_maker.nodes[0]
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took too long to start.")
self.producer.start()
self.producer.wait()
self.consumer.start()
self.consumer.wait()
assert len(self.consumer.messages_consumed[1]) == self.num_messages
self.mirror_maker.stop()
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
err_msg="Mirror maker took to long to stop.")

View File

@ -91,7 +91,7 @@ class ConsoleConsumer(BackgroundThreadService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None): def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
""" """
Args: Args:
context: standard context context: standard context
@ -161,7 +161,7 @@ class ConsoleConsumer(BackgroundThreadService):
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
# Create and upload log properties # Create and upload log properties
log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config) node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
# Run and capture output # Run and capture output
@ -169,7 +169,8 @@ class ConsoleConsumer(BackgroundThreadService):
self.logger.debug("Console consumer %d command: %s", idx, cmd) self.logger.debug("Console consumer %d command: %s", idx, cmd)
for line in node.account.ssh_capture(cmd, allow_fail=False): for line in node.account.ssh_capture(cmd, allow_fail=False):
msg = line.strip() msg = line.strip()
msg = self.message_validator(msg) if self.message_validator is not None:
msg = self.message_validator(msg)
if msg is not None: if msg is not None:
self.logger.debug("consumed a message: " + str(msg)) self.logger.debug("consumed a message: " + str(msg))
self.messages_consumed[idx].append(msg) self.messages_consumed[idx].append(msg)

View File

@ -0,0 +1,173 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
import os
"""
0.8.2.1 MirrorMaker options
Option Description
------ -----------
--abort.on.send.failure <Stop the Configure the mirror maker to exit on
entire mirror maker when a send a failed send. (default: true)
failure occurs>
--blacklist <Java regex (String)> Blacklist of topics to mirror.
--consumer.config <config file> Embedded consumer config for consuming
from the source cluster.
--consumer.rebalance.listener <A The consumer rebalance listener to use
custom rebalance listener of type for mirror maker consumer.
ConsumerRebalanceListener>
--help Print this message.
--message.handler <A custom message Message handler which will process
handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <Arguments Arguments used by custom rebalance
passed to message handler listener for mirror maker consumer
constructor.>
--num.streams <Integer: Number of Number of consumption streams.
threads> (default: 1)
--offset.commit.interval.ms <Integer: Offset commit interval in ms (default:
offset commit interval in 60000)
millisecond>
--producer.config <config file> Embedded producer config.
--rebalance.listener.args <Arguments Arguments used by custom rebalance
passed to custom rebalance listener listener for mirror maker consumer
constructor as a string.>
--whitelist <Java regex (String)> Whitelist of topics to mirror.
"""
"""
consumer config
zookeeper.connect={{ zookeeper_connect }}
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
group.id={{ group_id|default('test-consumer-group') }}
{% if consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}
"""
"""
producer config
metadata.broker.list={{ metadata_broker_list }}
producer.type={{ producer_type }} # sync or async
"""
class MirrorMaker(BackgroundThreadService):
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/mirror_maker"
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
KAFKA_HOME = "/opt/kafka/"
logs = {
"mirror_maker_log": {
"path": LOG_FILE,
"collect_default": True}
}
def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
"""
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
Args:
context: standard context
sources: list of one or more source kafka clusters
target: target cluster
whitelist: whitelist regex for topics to mirror
blacklist: blacklist regex for topics not to mirror
num_streams: number of consumer threads to create
consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages
"""
super(MirrorMaker, self).__init__(context, num_nodes=1)
self.consumer_timeout_ms = consumer_timeout_ms
self.num_streams = num_streams
self.whitelist = whitelist
self.blacklist = blacklist
self.sources = sources
self.target = target
def consumer_config_file(self, kafka):
return "consumer%s.properties" % kafka.zk.connect_setting()
@property
def start_cmd(self):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
for kafka in self.sources:
# One consumer config file per source kafka cluster
cmd += " --consumer.config %s" % self.consumer_config_file(kafka)
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
cmd += " --num.streams %d" % self.num_streams
if self.whitelist is not None:
cmd += " --whitelist=%s" % self.whitelist
if self.blacklist is not None:
cmd += " --blacklist=%s" % self.blacklist
cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE
return cmd
def pids(self, node):
try:
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except:
return []
def alive(self, node):
return len(self.pids(node)) > 0
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
# Create, upload one consumer config file per source kafka cluster
for kafka in self.sources:
consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(),
consumer_timeout_ms=self.consumer_timeout_ms)
node.account.create_file(self.consumer_config_file(kafka), consumer_props)
# Create, upload producer properties file for target cluster
producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
producer_type="async")
node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props)
# Create and upload log properties
log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE)
node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
# Run mirror maker
cmd = self.start_cmd
self.logger.debug("Mirror maker %d command: %s", idx, cmd)
node.account.ssh(cmd, allow_fail=False)
def stop_node(self, node):
node.account.kill_process("java", allow_fail=True)
def clean_node(self, node):
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details
zookeeper.connect={{ zookeeper_connect }}
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
group.id={{ group_id|default('test-consumer-group') }}
{% if consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}

View File

@ -14,108 +14,28 @@
# limitations under the License. # limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults # see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id={{ broker_id }} broker.id={{ broker_id }}
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092 port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost #host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name={{ node.account.hostname }} advertised.host.name={{ node.account.hostname }}
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients> #advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=3 num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8 num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400 socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=65536 socket.receive.buffer.bytes=65536
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600 socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/mnt/kafka-logs log.dirs=/mnt/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1 num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1 num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000 #log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000 #log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168 log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824 #log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824 log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000 log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect={{ zk.connect_setting() }} zookeeper.connect={{ zk.connect_setting() }}
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=2000 zookeeper.connection.timeout.ms=2000

View File

@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details
metadata.broker.list={{ broker_list }}
bootstrap.servers = {{ broker_list }}
producer.type={{ producer_type }} # sync or async
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
#partitioner.class=
#compressed.topics=
#queue.buffering.max.ms=
#queue.buffering.max.messages=
#queue.enqueue.timeout.ms=
#batch.num.messages=

View File

@ -89,6 +89,9 @@ class VerifiableProducer(BackgroundThreadService):
def stop_node(self, node): def stop_node(self, node):
node.account.kill_process("VerifiableProducer", allow_fail=False) node.account.kill_process("VerifiableProducer", allow_fail=False)
if self.worker_threads is None:
return
# block until the corresponding thread exits # block until the corresponding thread exits
if len(self.worker_threads) >= self.idx(node): if len(self.worker_threads) >= self.idx(node):
# Need to guard this because stop is preemptively called before the worker threads are added and started # Need to guard this because stop is preemptively called before the worker threads are added and started

View File

@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer, is_int
import signal import signal
import time import time
@ -76,12 +76,12 @@ class ReplicationTest(Test):
""" """
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000) self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int)
# Produce in a background thread while driving broker failures # Produce in a background thread while driving broker failures
self.producer.start() self.producer.start()
if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5): wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
raise RuntimeError("Producer failed to start in a reasonable amount of time.") err_msg="Producer failed to start in a reasonable amount of time.")
failure() failure()
self.producer.stop() self.producer.stop()

View File

@ -23,5 +23,5 @@ setup(name="kafkatest",
platforms=["any"], platforms=["any"],
license="apache2.0", license="apache2.0",
packages=find_packages(), packages=find_packages(),
requires=["ducktape(>=0.2.0)"] requires=["ducktape(==0.3.0)"]
) )