mirror of https://github.com/apache/kafka.git
Various cleanups per review.
This commit is contained in:
parent
1b4b04935e
commit
1e806f2a4c
|
@ -15,7 +15,6 @@
|
||||||
|
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from ducktape.utils.util import wait_until
|
from ducktape.utils.util import wait_until
|
||||||
from ducktape.mark import parametrize
|
|
||||||
|
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService
|
||||||
|
@ -75,8 +74,7 @@ class ConsoleConsumerTest(Test):
|
||||||
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
|
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
|
||||||
|
|
||||||
self.consumer.stop_node(node)
|
self.consumer.stop_node(node)
|
||||||
wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2,
|
|
||||||
err_msg="Timed out waiting for consumer to stop.")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
|
|
||||||
|
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from ducktape.utils.util import wait_until
|
|
||||||
|
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService
|
||||||
|
@ -25,50 +24,40 @@ from kafkatest.services.mirror_maker import MirrorMaker
|
||||||
|
|
||||||
|
|
||||||
class TestMirrorMakerService(Test):
|
class TestMirrorMakerService(Test):
|
||||||
"""Sanity checks on console consumer service class."""
|
"""Sanity checks on mirror maker service class."""
|
||||||
def __init__(self, test_context):
|
def __init__(self, test_context):
|
||||||
super(TestMirrorMakerService, self).__init__(test_context)
|
super(TestMirrorMakerService, self).__init__(test_context)
|
||||||
|
|
||||||
self.topic = "topic"
|
self.topic = "topic"
|
||||||
self.zk1 = ZookeeperService(test_context, num_nodes=1)
|
self.source_zk = ZookeeperService(test_context, num_nodes=1)
|
||||||
self.zk2 = ZookeeperService(test_context, num_nodes=1)
|
self.target_zk = ZookeeperService(test_context, num_nodes=1)
|
||||||
|
|
||||||
self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1,
|
self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
|
||||||
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
|
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
|
||||||
self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2,
|
self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
|
||||||
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
|
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
|
||||||
|
|
||||||
self.num_messages = 1000
|
self.num_messages = 1000
|
||||||
# This will produce to source kafka cluster
|
# This will produce to source kafka cluster
|
||||||
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic,
|
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
|
||||||
max_messages=self.num_messages, throughput=1000)
|
max_messages=self.num_messages, throughput=1000)
|
||||||
self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic)
|
|
||||||
|
# Use a regex whitelist to check that the start command is well-formed in this case
|
||||||
|
self.mirror_maker = MirrorMaker(test_context, source=self.source_kafka, target=self.target_kafka,
|
||||||
|
whitelist=".*", consumer_timeout_ms=10000)
|
||||||
|
|
||||||
# This will consume from target kafka cluster
|
# This will consume from target kafka cluster
|
||||||
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic,
|
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
|
||||||
consumer_timeout_ms=10000)
|
consumer_timeout_ms=10000)
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
# Source cluster
|
# Source cluster
|
||||||
self.zk1.start()
|
self.source_zk.start()
|
||||||
self.k1.start()
|
self.source_kafka.start()
|
||||||
|
|
||||||
# Target cluster
|
# Target cluster
|
||||||
self.zk2.start()
|
self.target_zk.start()
|
||||||
self.k2.start()
|
self.target_kafka.start()
|
||||||
|
|
||||||
def test_lifecycle(self):
|
|
||||||
"""Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a
|
|
||||||
reasonable amount of time.
|
|
||||||
"""
|
|
||||||
self.mirror_maker.start()
|
|
||||||
node = self.mirror_maker.nodes[0]
|
|
||||||
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
|
||||||
err_msg="Mirror maker took too long to start.")
|
|
||||||
|
|
||||||
self.mirror_maker.stop()
|
|
||||||
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
|
||||||
err_msg="Mirror maker took to long to stop.")
|
|
||||||
|
|
||||||
def test_end_to_end(self):
|
def test_end_to_end(self):
|
||||||
"""
|
"""
|
||||||
|
@ -80,19 +69,21 @@ class TestMirrorMakerService(Test):
|
||||||
- Start mirror maker.
|
- Start mirror maker.
|
||||||
- Produce a small number of messages to the source cluster.
|
- Produce a small number of messages to the source cluster.
|
||||||
- Consume messages from target.
|
- Consume messages from target.
|
||||||
- Confirm that number of consumed messages matches the number produced.
|
- Verify that number of consumed messages matches the number produced.
|
||||||
"""
|
"""
|
||||||
self.mirror_maker.start()
|
self.mirror_maker.start()
|
||||||
node = self.mirror_maker.nodes[0]
|
# Check that consumer_timeout_ms setting made it to config file
|
||||||
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
self.mirror_maker.node.account.ssh(
|
||||||
err_msg="Mirror maker took too long to start.")
|
"grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False)
|
||||||
|
|
||||||
self.producer.start()
|
self.producer.start()
|
||||||
self.producer.wait()
|
self.producer.wait()
|
||||||
self.consumer.start()
|
self.consumer.start()
|
||||||
self.consumer.wait()
|
self.consumer.wait()
|
||||||
assert len(self.consumer.messages_consumed[1]) == self.num_messages
|
|
||||||
|
num_consumed = len(self.consumer.messages_consumed[1])
|
||||||
|
num_produced = self.num_messages
|
||||||
|
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)
|
||||||
|
|
||||||
self.mirror_maker.stop()
|
self.mirror_maker.stop()
|
||||||
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
|
||||||
err_msg="Mirror maker took to long to stop.")
|
|
||||||
|
|
|
@ -14,8 +14,10 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ducktape.services.background_thread import BackgroundThreadService
|
from ducktape.services.background_thread import BackgroundThreadService
|
||||||
|
from ducktape.utils.util import wait_until
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
def is_int(msg):
|
def is_int(msg):
|
||||||
|
@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
|
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
|
||||||
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
|
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
|
||||||
return pid_arr
|
return pid_arr
|
||||||
except:
|
except (subprocess.CalledProcessError, ValueError) as e:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def alive(self, node):
|
def alive(self, node):
|
||||||
|
@ -180,6 +182,8 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
|
|
||||||
def stop_node(self, node):
|
def stop_node(self, node):
|
||||||
node.account.kill_process("java", allow_fail=True)
|
node.account.kill_process("java", allow_fail=True)
|
||||||
|
wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
|
||||||
|
err_msg="Timed out waiting for consumer to stop.")
|
||||||
|
|
||||||
def clean_node(self, node):
|
def clean_node(self, node):
|
||||||
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
||||||
|
|
|
@ -14,9 +14,11 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ducktape.services.background_thread import BackgroundThreadService
|
from ducktape.services.service import Service
|
||||||
|
from ducktape.utils.util import wait_until
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
"""
|
"""
|
||||||
0.8.2.1 MirrorMaker options
|
0.8.2.1 MirrorMaker options
|
||||||
|
@ -51,27 +53,8 @@ Option Description
|
||||||
--whitelist <Java regex (String)> Whitelist of topics to mirror.
|
--whitelist <Java regex (String)> Whitelist of topics to mirror.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
"""
|
|
||||||
consumer config
|
|
||||||
|
|
||||||
zookeeper.connect={{ zookeeper_connect }}
|
class MirrorMaker(Service):
|
||||||
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
|
|
||||||
group.id={{ group_id|default('test-consumer-group') }}
|
|
||||||
|
|
||||||
{% if consumer_timeout_ms is not none %}
|
|
||||||
consumer.timeout.ms={{ consumer_timeout_ms }}
|
|
||||||
{% endif %}
|
|
||||||
"""
|
|
||||||
|
|
||||||
"""
|
|
||||||
producer config
|
|
||||||
|
|
||||||
metadata.broker.list={{ metadata_broker_list }}
|
|
||||||
producer.type={{ producer_type }} # sync or async
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class MirrorMaker(BackgroundThreadService):
|
|
||||||
|
|
||||||
# Root directory for persistent output
|
# Root directory for persistent output
|
||||||
PERSISTENT_ROOT = "/mnt/mirror_maker"
|
PERSISTENT_ROOT = "/mnt/mirror_maker"
|
||||||
|
@ -79,6 +62,7 @@ class MirrorMaker(BackgroundThreadService):
|
||||||
LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
|
LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
|
||||||
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
||||||
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
|
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
|
||||||
|
CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
|
||||||
KAFKA_HOME = "/opt/kafka/"
|
KAFKA_HOME = "/opt/kafka/"
|
||||||
|
|
||||||
logs = {
|
logs = {
|
||||||
|
@ -87,14 +71,14 @@ class MirrorMaker(BackgroundThreadService):
|
||||||
"collect_default": True}
|
"collect_default": True}
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
|
def __init__(self, context, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
|
||||||
"""
|
"""
|
||||||
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
|
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
context: standard context
|
context: standard context
|
||||||
sources: list of one or more source kafka clusters
|
source: source Kafka cluster
|
||||||
target: target cluster
|
target: target Kafka cluster to which data will be mirrored
|
||||||
whitelist: whitelist regex for topics to mirror
|
whitelist: whitelist regex for topics to mirror
|
||||||
blacklist: blacklist regex for topics not to mirror
|
blacklist: blacklist regex for topics not to mirror
|
||||||
num_streams: number of consumer threads to create
|
num_streams: number of consumer threads to create
|
||||||
|
@ -106,50 +90,51 @@ class MirrorMaker(BackgroundThreadService):
|
||||||
self.num_streams = num_streams
|
self.num_streams = num_streams
|
||||||
self.whitelist = whitelist
|
self.whitelist = whitelist
|
||||||
self.blacklist = blacklist
|
self.blacklist = blacklist
|
||||||
self.sources = sources
|
self.source = source
|
||||||
self.target = target
|
self.target = target
|
||||||
|
|
||||||
def consumer_config_file(self, kafka):
|
|
||||||
return "consumer%s.properties" % kafka.zk.connect_setting()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def start_cmd(self):
|
def start_cmd(self):
|
||||||
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
|
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
|
||||||
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
|
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
|
||||||
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
|
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
|
||||||
for kafka in self.sources:
|
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
|
||||||
# One consumer config file per source kafka cluster
|
|
||||||
cmd += " --consumer.config %s" % self.consumer_config_file(kafka)
|
|
||||||
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
|
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
|
||||||
cmd += " --num.streams %d" % self.num_streams
|
cmd += " --num.streams %d" % self.num_streams
|
||||||
|
|
||||||
if self.whitelist is not None:
|
if self.whitelist is not None:
|
||||||
cmd += " --whitelist=%s" % self.whitelist
|
cmd += " --whitelist=\"%s\"" % self.whitelist
|
||||||
if self.blacklist is not None:
|
if self.blacklist is not None:
|
||||||
cmd += " --blacklist=%s" % self.blacklist
|
cmd += " --blacklist=\"%s\"" % self.blacklist
|
||||||
cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE
|
cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
|
@property
|
||||||
|
def node(self):
|
||||||
|
"""Convenience method since this Service only ever has one node"""
|
||||||
|
if self.nodes:
|
||||||
|
return self.nodes[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def pids(self, node):
|
def pids(self, node):
|
||||||
try:
|
try:
|
||||||
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
|
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
|
||||||
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
|
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
|
||||||
return pid_arr
|
return pid_arr
|
||||||
except:
|
except (subprocess.CalledProcessError, ValueError) as e:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def alive(self, node):
|
def alive(self):
|
||||||
return len(self.pids(node)) > 0
|
return len(self.pids(self.node)) > 0
|
||||||
|
|
||||||
def _worker(self, idx, node):
|
def start_node(self, node):
|
||||||
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
||||||
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
|
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
|
||||||
|
|
||||||
# Create, upload one consumer config file per source kafka cluster
|
# Create, upload one consumer config file for source cluster
|
||||||
for kafka in self.sources:
|
consumer_props = self.render('consumer.properties', zookeeper_connect=self.source.zk.connect_setting())
|
||||||
consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(),
|
node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
|
||||||
consumer_timeout_ms=self.consumer_timeout_ms)
|
|
||||||
node.account.create_file(self.consumer_config_file(kafka), consumer_props)
|
|
||||||
|
|
||||||
# Create, upload producer properties file for target cluster
|
# Create, upload producer properties file for target cluster
|
||||||
producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
|
producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
|
||||||
|
@ -162,11 +147,16 @@ class MirrorMaker(BackgroundThreadService):
|
||||||
|
|
||||||
# Run mirror maker
|
# Run mirror maker
|
||||||
cmd = self.start_cmd
|
cmd = self.start_cmd
|
||||||
self.logger.debug("Mirror maker %d command: %s", idx, cmd)
|
self.logger.debug("Mirror maker command: %s", cmd)
|
||||||
node.account.ssh(cmd, allow_fail=False)
|
node.account.ssh(cmd, allow_fail=False)
|
||||||
|
wait_until(lambda: self.alive(), timeout_sec=10, backoff_sec=.5,
|
||||||
|
err_msg="Mirror maker took to long to start.")
|
||||||
|
self.logger.debug("Mirror maker is alive")
|
||||||
|
|
||||||
def stop_node(self, node):
|
def stop_node(self, node):
|
||||||
node.account.kill_process("java", allow_fail=True)
|
node.account.kill_process("java", allow_fail=True)
|
||||||
|
wait_until(lambda: not self.alive(), timeout_sec=10, backoff_sec=.5,
|
||||||
|
err_msg="Mirror maker took to long to stop.")
|
||||||
|
|
||||||
def clean_node(self, node):
|
def clean_node(self, node):
|
||||||
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
||||||
|
|
|
@ -14,6 +14,6 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
# see kafka.server.KafkaConfig for additional details and defaults
|
# see kafka.server.KafkaConfig for additional details and defaults
|
||||||
|
|
||||||
{% if consumer_timeout_ms is not none %}
|
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
|
||||||
consumer.timeout.ms={{ consumer_timeout_ms }}
|
consumer.timeout.ms={{ consumer_timeout_ms }}
|
||||||
{% endif %}
|
{% endif %}
|
|
@ -18,6 +18,6 @@ zookeeper.connect={{ zookeeper_connect }}
|
||||||
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
|
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
|
||||||
group.id={{ group_id|default('test-consumer-group') }}
|
group.id={{ group_id|default('test-consumer-group') }}
|
||||||
|
|
||||||
{% if consumer_timeout_ms is not none %}
|
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
|
||||||
consumer.timeout.ms={{ consumer_timeout_ms }}
|
consumer.timeout.ms={{ consumer_timeout_ms }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
Loading…
Reference in New Issue