mirror of https://github.com/apache/kafka.git
Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0
This commit is contained in:
parent
786867c2e1
commit
1b4b04935e
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from ducktape.utils.util import wait_until
|
from ducktape.utils.util import wait_until
|
||||||
|
from ducktape.mark import parametrize
|
||||||
|
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService
|
||||||
|
@ -61,20 +62,21 @@ class ConsoleConsumerTest(Test):
|
||||||
self.consumer.start()
|
self.consumer.start()
|
||||||
node = self.consumer.nodes[0]
|
node = self.consumer.nodes[0]
|
||||||
|
|
||||||
if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):
|
wait_until(lambda: self.consumer.alive(node),
|
||||||
raise Exception("Consumer was too slow to start")
|
timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
|
||||||
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
|
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
|
||||||
|
|
||||||
# Verify that log output is happening
|
# Verify that log output is happening
|
||||||
if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10):
|
wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10,
|
||||||
raise Exception("Timed out waiting for log file to exist")
|
err_msg="Timed out waiting for logging to start.")
|
||||||
assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
|
assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
|
||||||
|
|
||||||
# Verify no consumed messages
|
# Verify no consumed messages
|
||||||
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
|
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
|
||||||
|
|
||||||
self.consumer.stop_node(node)
|
self.consumer.stop_node(node)
|
||||||
if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):
|
wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2,
|
||||||
raise Exception("Took too long for consumer to die.")
|
err_msg="Timed out waiting for consumer to stop.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from ducktape.tests.test import Test
|
||||||
|
from ducktape.utils.util import wait_until
|
||||||
|
|
||||||
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
|
from kafkatest.services.kafka import KafkaService
|
||||||
|
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||||
|
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
|
from kafkatest.services.mirror_maker import MirrorMaker
|
||||||
|
|
||||||
|
|
||||||
|
class TestMirrorMakerService(Test):
|
||||||
|
"""Sanity checks on console consumer service class."""
|
||||||
|
def __init__(self, test_context):
|
||||||
|
super(TestMirrorMakerService, self).__init__(test_context)
|
||||||
|
|
||||||
|
self.topic = "topic"
|
||||||
|
self.zk1 = ZookeeperService(test_context, num_nodes=1)
|
||||||
|
self.zk2 = ZookeeperService(test_context, num_nodes=1)
|
||||||
|
|
||||||
|
self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1,
|
||||||
|
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
|
||||||
|
self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2,
|
||||||
|
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
|
||||||
|
|
||||||
|
self.num_messages = 1000
|
||||||
|
# This will produce to source kafka cluster
|
||||||
|
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic,
|
||||||
|
max_messages=self.num_messages, throughput=1000)
|
||||||
|
self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic)
|
||||||
|
|
||||||
|
# This will consume from target kafka cluster
|
||||||
|
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic,
|
||||||
|
consumer_timeout_ms=10000)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
# Source cluster
|
||||||
|
self.zk1.start()
|
||||||
|
self.k1.start()
|
||||||
|
|
||||||
|
# Target cluster
|
||||||
|
self.zk2.start()
|
||||||
|
self.k2.start()
|
||||||
|
|
||||||
|
def test_lifecycle(self):
|
||||||
|
"""Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a
|
||||||
|
reasonable amount of time.
|
||||||
|
"""
|
||||||
|
self.mirror_maker.start()
|
||||||
|
node = self.mirror_maker.nodes[0]
|
||||||
|
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
||||||
|
err_msg="Mirror maker took too long to start.")
|
||||||
|
|
||||||
|
self.mirror_maker.stop()
|
||||||
|
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
||||||
|
err_msg="Mirror maker took to long to stop.")
|
||||||
|
|
||||||
|
def test_end_to_end(self):
|
||||||
|
"""
|
||||||
|
Test end-to-end behavior under non-failure conditions.
|
||||||
|
|
||||||
|
Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
|
||||||
|
One is source, and the other is target.
|
||||||
|
|
||||||
|
- Start mirror maker.
|
||||||
|
- Produce a small number of messages to the source cluster.
|
||||||
|
- Consume messages from target.
|
||||||
|
- Confirm that number of consumed messages matches the number produced.
|
||||||
|
"""
|
||||||
|
self.mirror_maker.start()
|
||||||
|
node = self.mirror_maker.nodes[0]
|
||||||
|
wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
||||||
|
err_msg="Mirror maker took too long to start.")
|
||||||
|
|
||||||
|
self.producer.start()
|
||||||
|
self.producer.wait()
|
||||||
|
self.consumer.start()
|
||||||
|
self.consumer.wait()
|
||||||
|
assert len(self.consumer.messages_consumed[1]) == self.num_messages
|
||||||
|
|
||||||
|
self.mirror_maker.stop()
|
||||||
|
wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5,
|
||||||
|
err_msg="Mirror maker took to long to stop.")
|
|
@ -91,7 +91,7 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
"collect_default": True}
|
"collect_default": True}
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
|
def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
context: standard context
|
context: standard context
|
||||||
|
@ -161,7 +161,7 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
|
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
|
||||||
|
|
||||||
# Create and upload log properties
|
# Create and upload log properties
|
||||||
log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
|
log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
|
||||||
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
|
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
|
||||||
|
|
||||||
# Run and capture output
|
# Run and capture output
|
||||||
|
@ -169,6 +169,7 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
self.logger.debug("Console consumer %d command: %s", idx, cmd)
|
self.logger.debug("Console consumer %d command: %s", idx, cmd)
|
||||||
for line in node.account.ssh_capture(cmd, allow_fail=False):
|
for line in node.account.ssh_capture(cmd, allow_fail=False):
|
||||||
msg = line.strip()
|
msg = line.strip()
|
||||||
|
if self.message_validator is not None:
|
||||||
msg = self.message_validator(msg)
|
msg = self.message_validator(msg)
|
||||||
if msg is not None:
|
if msg is not None:
|
||||||
self.logger.debug("consumed a message: " + str(msg))
|
self.logger.debug("consumed a message: " + str(msg))
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ducktape.services.background_thread import BackgroundThreadService
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
"""
|
||||||
|
0.8.2.1 MirrorMaker options
|
||||||
|
|
||||||
|
Option Description
|
||||||
|
------ -----------
|
||||||
|
--abort.on.send.failure <Stop the Configure the mirror maker to exit on
|
||||||
|
entire mirror maker when a send a failed send. (default: true)
|
||||||
|
failure occurs>
|
||||||
|
--blacklist <Java regex (String)> Blacklist of topics to mirror.
|
||||||
|
--consumer.config <config file> Embedded consumer config for consuming
|
||||||
|
from the source cluster.
|
||||||
|
--consumer.rebalance.listener <A The consumer rebalance listener to use
|
||||||
|
custom rebalance listener of type for mirror maker consumer.
|
||||||
|
ConsumerRebalanceListener>
|
||||||
|
--help Print this message.
|
||||||
|
--message.handler <A custom message Message handler which will process
|
||||||
|
handler of type every record in-between consumer and
|
||||||
|
MirrorMakerMessageHandler> producer.
|
||||||
|
--message.handler.args <Arguments Arguments used by custom rebalance
|
||||||
|
passed to message handler listener for mirror maker consumer
|
||||||
|
constructor.>
|
||||||
|
--num.streams <Integer: Number of Number of consumption streams.
|
||||||
|
threads> (default: 1)
|
||||||
|
--offset.commit.interval.ms <Integer: Offset commit interval in ms (default:
|
||||||
|
offset commit interval in 60000)
|
||||||
|
millisecond>
|
||||||
|
--producer.config <config file> Embedded producer config.
|
||||||
|
--rebalance.listener.args <Arguments Arguments used by custom rebalance
|
||||||
|
passed to custom rebalance listener listener for mirror maker consumer
|
||||||
|
constructor as a string.>
|
||||||
|
--whitelist <Java regex (String)> Whitelist of topics to mirror.
|
||||||
|
"""
|
||||||
|
|
||||||
|
"""
|
||||||
|
consumer config
|
||||||
|
|
||||||
|
zookeeper.connect={{ zookeeper_connect }}
|
||||||
|
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
|
||||||
|
group.id={{ group_id|default('test-consumer-group') }}
|
||||||
|
|
||||||
|
{% if consumer_timeout_ms is not none %}
|
||||||
|
consumer.timeout.ms={{ consumer_timeout_ms }}
|
||||||
|
{% endif %}
|
||||||
|
"""
|
||||||
|
|
||||||
|
"""
|
||||||
|
producer config
|
||||||
|
|
||||||
|
metadata.broker.list={{ metadata_broker_list }}
|
||||||
|
producer.type={{ producer_type }} # sync or async
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class MirrorMaker(BackgroundThreadService):
|
||||||
|
|
||||||
|
# Root directory for persistent output
|
||||||
|
PERSISTENT_ROOT = "/mnt/mirror_maker"
|
||||||
|
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
|
||||||
|
LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
|
||||||
|
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
||||||
|
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
|
||||||
|
KAFKA_HOME = "/opt/kafka/"
|
||||||
|
|
||||||
|
logs = {
|
||||||
|
"mirror_maker_log": {
|
||||||
|
"path": LOG_FILE,
|
||||||
|
"collect_default": True}
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None):
|
||||||
|
"""
|
||||||
|
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
context: standard context
|
||||||
|
sources: list of one or more source kafka clusters
|
||||||
|
target: target cluster
|
||||||
|
whitelist: whitelist regex for topics to mirror
|
||||||
|
blacklist: blacklist regex for topics not to mirror
|
||||||
|
num_streams: number of consumer threads to create
|
||||||
|
consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages
|
||||||
|
"""
|
||||||
|
super(MirrorMaker, self).__init__(context, num_nodes=1)
|
||||||
|
|
||||||
|
self.consumer_timeout_ms = consumer_timeout_ms
|
||||||
|
self.num_streams = num_streams
|
||||||
|
self.whitelist = whitelist
|
||||||
|
self.blacklist = blacklist
|
||||||
|
self.sources = sources
|
||||||
|
self.target = target
|
||||||
|
|
||||||
|
def consumer_config_file(self, kafka):
|
||||||
|
return "consumer%s.properties" % kafka.zk.connect_setting()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start_cmd(self):
|
||||||
|
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
|
||||||
|
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
|
||||||
|
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
|
||||||
|
for kafka in self.sources:
|
||||||
|
# One consumer config file per source kafka cluster
|
||||||
|
cmd += " --consumer.config %s" % self.consumer_config_file(kafka)
|
||||||
|
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
|
||||||
|
cmd += " --num.streams %d" % self.num_streams
|
||||||
|
|
||||||
|
if self.whitelist is not None:
|
||||||
|
cmd += " --whitelist=%s" % self.whitelist
|
||||||
|
if self.blacklist is not None:
|
||||||
|
cmd += " --blacklist=%s" % self.blacklist
|
||||||
|
cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def pids(self, node):
|
||||||
|
try:
|
||||||
|
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
|
||||||
|
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
|
||||||
|
return pid_arr
|
||||||
|
except:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def alive(self, node):
|
||||||
|
return len(self.pids(node)) > 0
|
||||||
|
|
||||||
|
def _worker(self, idx, node):
|
||||||
|
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
||||||
|
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
|
||||||
|
|
||||||
|
# Create, upload one consumer config file per source kafka cluster
|
||||||
|
for kafka in self.sources:
|
||||||
|
consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(),
|
||||||
|
consumer_timeout_ms=self.consumer_timeout_ms)
|
||||||
|
node.account.create_file(self.consumer_config_file(kafka), consumer_props)
|
||||||
|
|
||||||
|
# Create, upload producer properties file for target cluster
|
||||||
|
producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(),
|
||||||
|
producer_type="async")
|
||||||
|
node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props)
|
||||||
|
|
||||||
|
# Create and upload log properties
|
||||||
|
log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE)
|
||||||
|
node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
|
||||||
|
|
||||||
|
# Run mirror maker
|
||||||
|
cmd = self.start_cmd
|
||||||
|
self.logger.debug("Mirror maker %d command: %s", idx, cmd)
|
||||||
|
node.account.ssh(cmd, allow_fail=False)
|
||||||
|
|
||||||
|
def stop_node(self, node):
|
||||||
|
node.account.kill_process("java", allow_fail=True)
|
||||||
|
|
||||||
|
def clean_node(self, node):
|
||||||
|
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
# see kafka.consumer.ConsumerConfig for more details
|
||||||
|
|
||||||
|
zookeeper.connect={{ zookeeper_connect }}
|
||||||
|
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
|
||||||
|
group.id={{ group_id|default('test-consumer-group') }}
|
||||||
|
|
||||||
|
{% if consumer_timeout_ms is not none %}
|
||||||
|
consumer.timeout.ms={{ consumer_timeout_ms }}
|
||||||
|
{% endif %}
|
|
@ -14,108 +14,28 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
# see kafka.server.KafkaConfig for additional details and defaults
|
# see kafka.server.KafkaConfig for additional details and defaults
|
||||||
|
|
||||||
############################# Server Basics #############################
|
|
||||||
|
|
||||||
# The id of the broker. This must be set to a unique integer for each broker.
|
|
||||||
broker.id={{ broker_id }}
|
broker.id={{ broker_id }}
|
||||||
|
|
||||||
############################# Socket Server Settings #############################
|
|
||||||
|
|
||||||
# The port the socket server listens on
|
|
||||||
port=9092
|
port=9092
|
||||||
|
|
||||||
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
|
|
||||||
#host.name=localhost
|
#host.name=localhost
|
||||||
|
|
||||||
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
|
|
||||||
# value for "host.name" if configured. Otherwise, it will use the value returned from
|
|
||||||
# java.net.InetAddress.getCanonicalHostName().
|
|
||||||
advertised.host.name={{ node.account.hostname }}
|
advertised.host.name={{ node.account.hostname }}
|
||||||
|
|
||||||
# The port to publish to ZooKeeper for clients to use. If this is not set,
|
|
||||||
# it will publish the same port that the broker binds to.
|
|
||||||
#advertised.port=<port accessible by clients>
|
#advertised.port=<port accessible by clients>
|
||||||
|
|
||||||
# The number of threads handling network requests
|
|
||||||
num.network.threads=3
|
num.network.threads=3
|
||||||
|
|
||||||
# The number of threads doing disk I/O
|
|
||||||
num.io.threads=8
|
num.io.threads=8
|
||||||
|
|
||||||
# The send buffer (SO_SNDBUF) used by the socket server
|
|
||||||
socket.send.buffer.bytes=102400
|
socket.send.buffer.bytes=102400
|
||||||
|
|
||||||
# The receive buffer (SO_RCVBUF) used by the socket server
|
|
||||||
socket.receive.buffer.bytes=65536
|
socket.receive.buffer.bytes=65536
|
||||||
|
|
||||||
# The maximum size of a request that the socket server will accept (protection against OOM)
|
|
||||||
socket.request.max.bytes=104857600
|
socket.request.max.bytes=104857600
|
||||||
|
|
||||||
|
|
||||||
############################# Log Basics #############################
|
|
||||||
|
|
||||||
# A comma seperated list of directories under which to store log files
|
|
||||||
log.dirs=/mnt/kafka-logs
|
log.dirs=/mnt/kafka-logs
|
||||||
|
|
||||||
# The default number of log partitions per topic. More partitions allow greater
|
|
||||||
# parallelism for consumption, but this will also result in more files across
|
|
||||||
# the brokers.
|
|
||||||
num.partitions=1
|
num.partitions=1
|
||||||
|
|
||||||
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
|
|
||||||
# This value is recommended to be increased for installations with data dirs located in RAID array.
|
|
||||||
num.recovery.threads.per.data.dir=1
|
num.recovery.threads.per.data.dir=1
|
||||||
|
|
||||||
############################# Log Flush Policy #############################
|
|
||||||
|
|
||||||
# Messages are immediately written to the filesystem but by default we only fsync() to sync
|
|
||||||
# the OS cache lazily. The following configurations control the flush of data to disk.
|
|
||||||
# There are a few important trade-offs here:
|
|
||||||
# 1. Durability: Unflushed data may be lost if you are not using replication.
|
|
||||||
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
|
|
||||||
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
|
|
||||||
# The settings below allow one to configure the flush policy to flush data after a period of time or
|
|
||||||
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
|
|
||||||
|
|
||||||
# The number of messages to accept before forcing a flush of data to disk
|
|
||||||
#log.flush.interval.messages=10000
|
#log.flush.interval.messages=10000
|
||||||
|
|
||||||
# The maximum amount of time a message can sit in a log before we force a flush
|
|
||||||
#log.flush.interval.ms=1000
|
#log.flush.interval.ms=1000
|
||||||
|
|
||||||
############################# Log Retention Policy #############################
|
|
||||||
|
|
||||||
# The following configurations control the disposal of log segments. The policy can
|
|
||||||
# be set to delete segments after a period of time, or after a given size has accumulated.
|
|
||||||
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
|
|
||||||
# from the end of the log.
|
|
||||||
|
|
||||||
# The minimum age of a log file to be eligible for deletion
|
|
||||||
log.retention.hours=168
|
log.retention.hours=168
|
||||||
|
|
||||||
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
|
|
||||||
# segments don't drop below log.retention.bytes.
|
|
||||||
#log.retention.bytes=1073741824
|
#log.retention.bytes=1073741824
|
||||||
|
|
||||||
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
|
|
||||||
log.segment.bytes=1073741824
|
log.segment.bytes=1073741824
|
||||||
|
|
||||||
# The interval at which log segments are checked to see if they can be deleted according
|
|
||||||
# to the retention policies
|
|
||||||
log.retention.check.interval.ms=300000
|
log.retention.check.interval.ms=300000
|
||||||
|
|
||||||
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
|
|
||||||
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
|
|
||||||
log.cleaner.enable=false
|
log.cleaner.enable=false
|
||||||
|
|
||||||
############################# Zookeeper #############################
|
|
||||||
|
|
||||||
# Zookeeper connection string (see zookeeper docs for details).
|
|
||||||
# This is a comma separated host:port pairs, each corresponding to a zk
|
|
||||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
|
||||||
# You can also append an optional chroot string to the urls to specify the
|
|
||||||
# root directory for all kafka znodes.
|
|
||||||
zookeeper.connect={{ zk.connect_setting() }}
|
zookeeper.connect={{ zk.connect_setting() }}
|
||||||
|
|
||||||
# Timeout in ms for connecting to zookeeper
|
|
||||||
zookeeper.connection.timeout.ms=2000
|
zookeeper.connection.timeout.ms=2000
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
# see kafka.producer.ProducerConfig for more details
|
||||||
|
|
||||||
|
metadata.broker.list={{ broker_list }}
|
||||||
|
bootstrap.servers = {{ broker_list }}
|
||||||
|
producer.type={{ producer_type }} # sync or async
|
||||||
|
compression.codec=none
|
||||||
|
serializer.class=kafka.serializer.DefaultEncoder
|
||||||
|
|
||||||
|
#partitioner.class=
|
||||||
|
#compressed.topics=
|
||||||
|
#queue.buffering.max.ms=
|
||||||
|
#queue.buffering.max.messages=
|
||||||
|
#queue.enqueue.timeout.ms=
|
||||||
|
#batch.num.messages=
|
|
@ -89,6 +89,9 @@ class VerifiableProducer(BackgroundThreadService):
|
||||||
|
|
||||||
def stop_node(self, node):
|
def stop_node(self, node):
|
||||||
node.account.kill_process("VerifiableProducer", allow_fail=False)
|
node.account.kill_process("VerifiableProducer", allow_fail=False)
|
||||||
|
if self.worker_threads is None:
|
||||||
|
return
|
||||||
|
|
||||||
# block until the corresponding thread exits
|
# block until the corresponding thread exits
|
||||||
if len(self.worker_threads) >= self.idx(node):
|
if len(self.worker_threads) >= self.idx(node):
|
||||||
# Need to guard this because stop is preemptively called before the worker threads are added and started
|
# Need to guard this because stop is preemptively called before the worker threads are added and started
|
||||||
|
|
|
@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService
|
||||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
from kafkatest.services.console_consumer import ConsoleConsumer, is_int
|
||||||
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
@ -76,12 +76,12 @@ class ReplicationTest(Test):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
|
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
|
||||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000)
|
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int)
|
||||||
|
|
||||||
# Produce in a background thread while driving broker failures
|
# Produce in a background thread while driving broker failures
|
||||||
self.producer.start()
|
self.producer.start()
|
||||||
if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
|
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
|
||||||
raise RuntimeError("Producer failed to start in a reasonable amount of time.")
|
err_msg="Producer failed to start in a reasonable amount of time.")
|
||||||
failure()
|
failure()
|
||||||
self.producer.stop()
|
self.producer.stop()
|
||||||
|
|
||||||
|
|
|
@ -23,5 +23,5 @@ setup(name="kafkatest",
|
||||||
platforms=["any"],
|
platforms=["any"],
|
||||||
license="apache2.0",
|
license="apache2.0",
|
||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
requires=["ducktape(>=0.2.0)"]
|
requires=["ducktape(==0.3.0)"]
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue