From 1b4b04935eafa2e93583ea5683c2e8851ed43476 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Tue, 18 Aug 2015 01:15:00 -0700 Subject: [PATCH] Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0 --- .../sanity_checks/test_console_consumer.py | 14 +- .../sanity_checks/test_mirror_maker.py | 98 ++++++++++ tests/kafkatest/services/console_consumer.py | 7 +- tests/kafkatest/services/mirror_maker.py | 173 ++++++++++++++++++ .../services/templates/consumer.properties | 23 +++ .../services/templates/kafka.properties | 80 -------- .../services/templates/producer.properties | 28 +++ ...og4j.properties => tools_log4j.properties} | 0 .../kafkatest/services/verifiable_producer.py | 3 + tests/kafkatest/tests/replication_test.py | 8 +- tests/setup.py | 2 +- 11 files changed, 342 insertions(+), 94 deletions(-) create mode 100644 tests/kafkatest/sanity_checks/test_mirror_maker.py create mode 100644 tests/kafkatest/services/mirror_maker.py create mode 100644 tests/kafkatest/services/templates/consumer.properties create mode 100644 tests/kafkatest/services/templates/producer.properties rename tests/kafkatest/services/templates/{console_consumer_log4j.properties => tools_log4j.properties} (100%) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index cd8c8f91173..370f5c4b8ac 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -15,6 +15,7 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until +from ducktape.mark import parametrize from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -61,20 +62,21 @@ class ConsoleConsumerTest(Test): self.consumer.start() node = self.consumer.nodes[0] - if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2): - raise Exception("Consumer was too slow to start") + wait_until(lambda: self.consumer.alive(node), + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") self.logger.info("consumer started in %s seconds " % str(time.time() - t0)) # Verify that log output is happening - if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10): - raise Exception("Timed out waiting for log file to exist") + wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10, + err_msg="Timed out waiting for logging to start.") assert line_count(node, ConsoleConsumer.LOG_FILE) > 0 # Verify no consumed messages assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) - if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2): - raise Exception("Took too long for consumer to die.") + wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, + err_msg="Timed out waiting for consumer to stop.") + diff --git a/tests/kafkatest/sanity_checks/test_mirror_maker.py b/tests/kafkatest/sanity_checks/test_mirror_maker.py new file mode 100644 index 00000000000..a91a5841af0 --- /dev/null +++ b/tests/kafkatest/sanity_checks/test_mirror_maker.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.mirror_maker import MirrorMaker + + +class TestMirrorMakerService(Test): + """Sanity checks on console consumer service class.""" + def __init__(self, test_context): + super(TestMirrorMakerService, self).__init__(test_context) + + self.topic = "topic" + self.zk1 = ZookeeperService(test_context, num_nodes=1) + self.zk2 = ZookeeperService(test_context, num_nodes=1) + + self.k1 = KafkaService(test_context, num_nodes=1, zk=self.zk1, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + self.k2 = KafkaService(test_context, num_nodes=1, zk=self.zk2, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + + self.num_messages = 1000 + # This will produce to source kafka cluster + self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.k1, topic=self.topic, + max_messages=self.num_messages, throughput=1000) + self.mirror_maker = MirrorMaker(test_context, sources=[self.k1], target=self.k2, whitelist=self.topic) + + # This will consume from target kafka cluster + self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.k2, topic=self.topic, + consumer_timeout_ms=10000) + + def setUp(self): + # Source cluster + self.zk1.start() + self.k1.start() + + # Target cluster + self.zk2.start() + self.k2.start() + + def test_lifecycle(self): + """Start and stop a single-node MirrorMaker and validate that the process appears and disappears in a + reasonable amount of time. + """ + self.mirror_maker.start() + node = self.mirror_maker.nodes[0] + wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took too long to start.") + + self.mirror_maker.stop() + wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took to long to stop.") + + def test_end_to_end(self): + """ + Test end-to-end behavior under non-failure conditions. + + Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster. + One is source, and the other is target. + + - Start mirror maker. + - Produce a small number of messages to the source cluster. + - Consume messages from target. + - Confirm that number of consumed messages matches the number produced. + """ + self.mirror_maker.start() + node = self.mirror_maker.nodes[0] + wait_until(lambda: self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took too long to start.") + + self.producer.start() + self.producer.wait() + self.consumer.start() + self.consumer.wait() + assert len(self.consumer.messages_consumed[1]) == self.num_messages + + self.mirror_maker.stop() + wait_until(lambda: not self.mirror_maker.alive(node), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took to long to stop.") diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 18c9f63f2fd..fb300361de5 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -91,7 +91,7 @@ class ConsoleConsumer(BackgroundThreadService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None): + def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None): """ Args: context: standard context @@ -161,7 +161,7 @@ class ConsoleConsumer(BackgroundThreadService): node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) # Create and upload log properties - log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) + log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config) # Run and capture output @@ -169,7 +169,8 @@ class ConsoleConsumer(BackgroundThreadService): self.logger.debug("Console consumer %d command: %s", idx, cmd) for line in node.account.ssh_capture(cmd, allow_fail=False): msg = line.strip() - msg = self.message_validator(msg) + if self.message_validator is not None: + msg = self.message_validator(msg) if msg is not None: self.logger.debug("consumed a message: " + str(msg)) self.messages_consumed[idx].append(msg) diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py new file mode 100644 index 00000000000..a6a1448d60c --- /dev/null +++ b/tests/kafkatest/services/mirror_maker.py @@ -0,0 +1,173 @@ + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +import os + +""" +0.8.2.1 MirrorMaker options + +Option Description +------ ----------- +--abort.on.send.failure +--blacklist Blacklist of topics to mirror. +--consumer.config Embedded consumer config for consuming + from the source cluster. +--consumer.rebalance.listener +--help Print this message. +--message.handler producer. +--message.handler.args +--num.streams (default: 1) +--offset.commit.interval.ms +--producer.config Embedded producer config. +--rebalance.listener.args +--whitelist Whitelist of topics to mirror. +""" + +""" +consumer config + +zookeeper.connect={{ zookeeper_connect }} +zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }} +group.id={{ group_id|default('test-consumer-group') }} + +{% if consumer_timeout_ms is not none %} +consumer.timeout.ms={{ consumer_timeout_ms }} +{% endif %} +""" + +""" +producer config + +metadata.broker.list={{ metadata_broker_list }} +producer.type={{ producer_type }} # sync or async +""" + + +class MirrorMaker(BackgroundThreadService): + + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/mirror_maker" + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties") + KAFKA_HOME = "/opt/kafka/" + + logs = { + "mirror_maker_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, sources, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None): + """ + MirrorMaker mirrors messages from one or more source clusters to a single destination cluster. + + Args: + context: standard context + sources: list of one or more source kafka clusters + target: target cluster + whitelist: whitelist regex for topics to mirror + blacklist: blacklist regex for topics not to mirror + num_streams: number of consumer threads to create + consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages + """ + super(MirrorMaker, self).__init__(context, num_nodes=1) + + self.consumer_timeout_ms = consumer_timeout_ms + self.num_streams = num_streams + self.whitelist = whitelist + self.blacklist = blacklist + self.sources = sources + self.target = target + + def consumer_config_file(self, kafka): + return "consumer%s.properties" % kafka.zk.connect_setting() + + @property + def start_cmd(self): + cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG + cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME + for kafka in self.sources: + # One consumer config file per source kafka cluster + cmd += " --consumer.config %s" % self.consumer_config_file(kafka) + cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG + cmd += " --num.streams %d" % self.num_streams + + if self.whitelist is not None: + cmd += " --whitelist=%s" % self.whitelist + if self.blacklist is not None: + cmd += " --blacklist=%s" % self.blacklist + cmd += " 2>&1 1>>%s &" % MirrorMaker.LOG_FILE + return cmd + + def pids(self, node): + try: + cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except: + return [] + + def alive(self, node): + return len(self.pids(node)) > 0 + + def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) + node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False) + + # Create, upload one consumer config file per source kafka cluster + for kafka in self.sources: + consumer_props = self.render('consumer.properties', zookeeper_connect=kafka.zk.connect_setting(), + consumer_timeout_ms=self.consumer_timeout_ms) + node.account.create_file(self.consumer_config_file(kafka), consumer_props) + + # Create, upload producer properties file for target cluster + producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(), + producer_type="async") + node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE) + node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config) + + # Run mirror maker + cmd = self.start_cmd + self.logger.debug("Mirror maker %d command: %s", idx, cmd) + node.account.ssh(cmd, allow_fail=False) + + def stop_node(self, node): + node.account.kill_process("java", allow_fail=True) + + def clean_node(self, node): + node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) + diff --git a/tests/kafkatest/services/templates/consumer.properties b/tests/kafkatest/services/templates/consumer.properties new file mode 100644 index 00000000000..8ac53b854e1 --- /dev/null +++ b/tests/kafkatest/services/templates/consumer.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +zookeeper.connect={{ zookeeper_connect }} +zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }} +group.id={{ group_id|default('test-consumer-group') }} + +{% if consumer_timeout_ms is not none %} +consumer.timeout.ms={{ consumer_timeout_ms }} +{% endif %} diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties index db1077a4a4e..6650d237849 100644 --- a/tests/kafkatest/services/templates/kafka.properties +++ b/tests/kafkatest/services/templates/kafka.properties @@ -14,108 +14,28 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults -############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. broker.id={{ broker_id }} - -############################# Socket Server Settings ############################# - -# The port the socket server listens on port=9092 - -# Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost - -# Hostname the broker will advertise to producers and consumers. If not set, it uses the -# value for "host.name" if configured. Otherwise, it will use the value returned from -# java.net.InetAddress.getCanonicalHostName(). advertised.host.name={{ node.account.hostname }} - -# The port to publish to ZooKeeper for clients to use. If this is not set, -# it will publish the same port that the broker binds to. #advertised.port= - -# The number of threads handling network requests num.network.threads=3 - -# The number of threads doing disk I/O num.io.threads=8 - -# The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 - -# The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=65536 - -# The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 - -############################# Log Basics ############################# - -# A comma seperated list of directories under which to store log files log.dirs=/mnt/kafka-logs - -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. num.partitions=1 - -# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. -# This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 - -# The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion log.retention.hours=168 - -# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies log.retention.check.interval.ms=300000 - -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. zookeeper.connect={{ zk.connect_setting() }} - -# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=2000 diff --git a/tests/kafkatest/services/templates/producer.properties b/tests/kafkatest/services/templates/producer.properties new file mode 100644 index 00000000000..ede60c8f322 --- /dev/null +++ b/tests/kafkatest/services/templates/producer.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +metadata.broker.list={{ broker_list }} +bootstrap.servers = {{ broker_list }} +producer.type={{ producer_type }} # sync or async +compression.codec=none +serializer.class=kafka.serializer.DefaultEncoder + +#partitioner.class= +#compressed.topics= +#queue.buffering.max.ms= +#queue.buffering.max.messages= +#queue.enqueue.timeout.ms= +#batch.num.messages= diff --git a/tests/kafkatest/services/templates/console_consumer_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties similarity index 100% rename from tests/kafkatest/services/templates/console_consumer_log4j.properties rename to tests/kafkatest/services/templates/tools_log4j.properties diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index cca82277022..ec6272c40e1 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -89,6 +89,9 @@ class VerifiableProducer(BackgroundThreadService): def stop_node(self, node): node.account.kill_process("VerifiableProducer", allow_fail=False) + if self.worker_threads is None: + return + # block until the corresponding thread exits if len(self.worker_threads) >= self.idx(node): # Need to guard this because stop is preemptively called before the worker threads are added and started diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index fed1ea1f8e2..755fb42585f 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.console_consumer import ConsoleConsumer, is_int import signal import time @@ -76,12 +76,12 @@ class ReplicationTest(Test): """ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int) # Produce in a background thread while driving broker failures self.producer.start() - if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5): - raise RuntimeError("Producer failed to start in a reasonable amount of time.") + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5, + err_msg="Producer failed to start in a reasonable amount of time.") failure() self.producer.stop() diff --git a/tests/setup.py b/tests/setup.py index 5ce4bb797aa..a2fa71aaaf1 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -23,5 +23,5 @@ setup(name="kafkatest", platforms=["any"], license="apache2.0", packages=find_packages(), - requires=["ducktape(>=0.2.0)"] + requires=["ducktape(==0.3.0)"] )