From 62e043a86565cc9bc485658d6c6d176e9aff620f Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Sun, 11 Dec 2016 18:43:23 -0800 Subject: [PATCH] KAFKA-4140: Upgrade to ducktape 0.6.0 and make system tests parallel friendly Updates to take advantage of soon-to-be-released ducktape features. Author: Geoff Anderson Author: Ewen Cheslack-Postava Reviewers: Ewen Cheslack-Postava Closes #1834 from granders/systest-parallel-friendly --- .../benchmarks/core/benchmark_test.py | 13 ++- .../streams/streams_simple_benchmark_test.py | 5 +- .../sanity_checks/test_console_consumer.py | 12 ++- .../sanity_checks/test_kafka_version.py | 3 + .../test_performance_services.py | 2 + .../sanity_checks/test_verifiable_producer.py | 2 + tests/kafkatest/services/connect.py | 3 +- tests/kafkatest/services/console_consumer.py | 5 +- tests/kafkatest/services/kafka/kafka.py | 10 +-- .../services/kafka_log4j_appender.py | 3 +- tests/kafkatest/services/mirror_maker.py | 4 +- tests/kafkatest/services/monitor/jmx.py | 44 ++++++--- .../performance/producer_performance.py | 12 ++- .../performance/streams_performance.py | 2 +- .../services/replica_verification_tool.py | 3 +- tests/kafkatest/services/security/minikdc.py | 43 ++++++++- .../services/security/security_config.py | 49 +++++----- .../kafkatest/services/verifiable_consumer.py | 4 +- .../kafkatest/services/verifiable_producer.py | 90 +++++++++++++------ tests/kafkatest/services/zookeeper.py | 8 +- .../tests/client/compression_test.py | 3 + .../client/consumer_rolling_upgrade_test.py | 4 +- tests/kafkatest/tests/client/consumer_test.py | 11 ++- .../client/message_format_change_test.py | 4 +- tests/kafkatest/tests/client/quota_test.py | 2 + .../tests/connect/connect_distributed_test.py | 30 ++++--- .../tests/connect/connect_rest_test.py | 9 +- tests/kafkatest/tests/connect/connect_test.py | 19 ++-- .../compatibility_test_new_broker_test.py | 4 +- .../tests/core/consumer_group_command_test.py | 4 + .../tests/core/get_offset_shell_test.py | 6 +- .../kafkatest/tests/core/mirror_maker_test.py | 13 ++- .../tests/core/reassign_partitions_test.py | 3 + .../kafkatest/tests/core/replication_test.py | 5 +- .../core/security_rolling_upgrade_test.py | 12 ++- tests/kafkatest/tests/core/security_test.py | 55 ++++++++---- .../tests/core/simple_consumer_shell_test.py | 4 + tests/kafkatest/tests/core/throttling_test.py | 2 + tests/kafkatest/tests/core/upgrade_test.py | 5 ++ .../core/zookeeper_security_upgrade_test.py | 6 +- .../tests/streams/streams_bounce_test.py | 8 +- .../tests/streams/streams_smoke_test.py | 11 ++- .../tests/tools/log4j_appender_test.py | 8 +- .../tests/tools/replica_verification_test.py | 8 +- tests/setup.py | 2 +- 45 files changed, 394 insertions(+), 161 deletions(-) diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 4dbf902432c..14fab2f45ec 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -15,6 +15,7 @@ from ducktape.mark import matrix from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.services.service import Service from ducktape.tests.test import Test @@ -63,11 +64,13 @@ class Benchmark(Test): self.kafka.log_level = "INFO" # We don't DEBUG logging here self.kafka.start() + @cluster(num_nodes=5) @parametrize(acks=1, topic=TOPIC_REP_ONE) @parametrize(acks=1, topic=TOPIC_REP_THREE) @parametrize(acks=-1, topic=TOPIC_REP_THREE) - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL']) + @cluster(num_nodes=7) + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK), broker_version=str(TRUNK)): @@ -97,6 +100,7 @@ class Benchmark(Test): self.producer.run() return compute_aggregate_throughput(self.producer) + @cluster(num_nodes=5) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT', @@ -152,8 +156,11 @@ class Benchmark(Test): self.logger.info("\n".join(summary)) return data + @cluster(num_nodes=5) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) + @cluster(num_nodes=6) + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"]) def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", interbroker_security_protocol=None, client_version=str(TRUNK), broker_version=str(TRUNK)): @@ -181,6 +188,7 @@ class Benchmark(Test): self.perf.run() return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) + @cluster(num_nodes=6) @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) @@ -229,6 +237,7 @@ class Benchmark(Test): self.logger.info("\n".join(summary)) return data + @cluster(num_nodes=6) @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index de687e69b6f..ab9b112b4fb 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import ignore +from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService -import time + class StreamsSimpleBenchmarkTest(KafkaTest): """ @@ -29,6 +29,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest): self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L) + @cluster(num_nodes=3) def test_simple_benchmark(self): """ Run simple Kafka Streams benchmark diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 18cbfb77210..38db057e565 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -17,6 +17,7 @@ import time from ducktape.mark import matrix from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until @@ -42,9 +43,12 @@ class ConsoleConsumerTest(Test): def setUp(self): self.zk.start() + @cluster(num_nodes=3) @parametrize(security_protocol='PLAINTEXT', new_consumer=False) + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + @cluster(num_nodes=4) @parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN') - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'): """Check that console consumer starts/stops properly, and that we are capturing log output.""" @@ -66,14 +70,16 @@ class ConsoleConsumerTest(Test): # Verify that log output is happening wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10, - err_msg="Timed out waiting for logging to start.") - assert line_count(node, ConsoleConsumer.LOG_FILE) > 0 + err_msg="Timed out waiting for consumer log file to exist.") + wait_until(lambda: line_count(node, ConsoleConsumer.LOG_FILE) > 0, timeout_sec=1, + backoff_sec=.25, err_msg="Timed out waiting for log entries to start.") # Verify no consumed messages assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) + @cluster(num_nodes=4) def test_version(self): """Check that console consumer v0.8.2.X successfully starts and consumes messages.""" self.kafka.start() diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py index b33c5905827..35500935c69 100644 --- a/tests/kafkatest/sanity_checks/test_kafka_version.py +++ b/tests/kafkatest/sanity_checks/test_kafka_version.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.tests.test import Test +from ducktape.mark.resource import cluster from kafkatest.services.kafka import KafkaService, config_property from kafkatest.services.zookeeper import ZookeeperService @@ -32,6 +33,7 @@ class KafkaVersionTest(Test): def setUp(self): self.zk.start() + @cluster(num_nodes=2) def test_0_8_2(self): """Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster.""" self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, @@ -42,6 +44,7 @@ class KafkaVersionTest(Test): assert is_version(node, [LATEST_0_8_2]) + @cluster(num_nodes=3) def test_multi_version(self): """Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X, the other on trunk.""" diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py index 7b5946aa77d..b939f2b61ce 100644 --- a/tests/kafkatest/sanity_checks/test_performance_services.py +++ b/tests/kafkatest/sanity_checks/test_performance_services.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService @@ -35,6 +36,7 @@ class PerformanceServiceTest(Test): def setUp(self): self.zk.start() + @cluster(num_nodes=5) # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, # the overhead should be manageable. @parametrize(version=str(LATEST_0_8_2), new_consumer=False) diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index 23932f3f27c..544d7b98919 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -15,6 +15,7 @@ from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until @@ -44,6 +45,7 @@ class TestVerifiableProducer(Test): self.zk.start() self.kafka.start() + @cluster(num_nodes=3) @parametrize(producer_version=str(LATEST_0_8_2)) @parametrize(producer_version=str(LATEST_0_9)) @parametrize(producer_version=str(TRUNK)) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 473eb0be482..45140fc1517 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -304,7 +304,8 @@ class VerifiableConnector(object): self.logger.debug("Ignoring unparseable line: %s", line) continue # Filter to only ones matching our name to support multiple verifiable producers - if data['name'] != self.name: continue + if data['name'] != self.name: + continue data['node'] = node records.append(data) return records diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 050ea6dedb4..6984fc970ce 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -15,10 +15,9 @@ import itertools import os -import subprocess from ducktape.services.background_thread import BackgroundThreadService -from ducktape.utils.util import wait_until +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin @@ -211,7 +210,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError) as e: return [] def alive(self, node): diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index c79f8c8debf..f773d8d6843 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -18,11 +18,11 @@ import json import os.path import re import signal -import subprocess import time from ducktape.services.service import Service from ducktape.utils.util import wait_until +from ducktape.cluster.remoteaccount import RemoteCommandError from config import KafkaConfig from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -121,8 +121,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @property def security_config(self): - return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, - zk_sasl = self.zk.zk_sasl, + return SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol, + zk_sasl=self.zk.zk_sasl, client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism) def open_port(self, protocol): @@ -208,7 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor: node.account.ssh(cmd) - monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup") + monitor.wait_until("Kafka Server.*started", timeout_sec=30, backoff_sec=.25, err_msg="Kafka server didn't finish startup") self.start_jmx_tool(self.idx(node), node) if len(self.pids(node)) == 0: @@ -221,7 +221,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError) as e: return [] def signal_node(self, node, sig=signal.SIGTERM): diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index b25d8be92e4..29a42029cc2 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -34,7 +34,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): self.topic = topic self.max_messages = max_messages self.security_protocol = security_protocol - self.security_config = SecurityConfig(security_protocol) + self.security_config = SecurityConfig(self.context, security_protocol) + self.stop_timeout_sec = 30 def _worker(self, idx, node): cmd = self.start_cmd(node) diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py index 14af4cf5971..c056705ac6e 100644 --- a/tests/kafkatest/services/mirror_maker.py +++ b/tests/kafkatest/services/mirror_maker.py @@ -14,10 +14,10 @@ # limitations under the License. import os -import subprocess from ducktape.services.service import Service from ducktape.utils.util import wait_until +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -145,7 +145,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service): cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError): return [] def alive(self, node): diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 19ca5fdc67b..e71040bb5de 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.cluster.remoteaccount import RemoteCommandError +from ducktape.utils.util import wait_until + class JmxMixin(object): """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats. @@ -31,12 +34,19 @@ class JmxMixin(object): self.maximum_jmx_value = {} # map from object_attribute_name to maximum value observed over time self.average_jmx_value = {} # map from object_attribute_name to average value observed over time + self.jmx_tool_log = "/mnt/jmx_tool.log" + def clean_node(self, node): node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False) + node.account.ssh("rm -rf %s" % self.jmx_tool_log, allow_fail=False) def start_jmx_tool(self, idx, node): - if self.started[idx-1] or self.jmx_object_names is None: + if self.jmx_object_names is None: + self.logger.debug("%s: Not starting jmx tool because no jmx objects are defined" % node.account) + return + + if self.started[idx-1]: + self.logger.debug("%s: jmx tool has been started already on this node" % node.account) return cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node) @@ -45,31 +55,43 @@ class JmxMixin(object): cmd += " --object-name %s" % jmx_object_name for jmx_attribute in self.jmx_attributes: cmd += " --attributes %s" % jmx_attribute - cmd += " | tee -a /mnt/jmx_tool.log" - - self.logger.debug("Start JmxTool %d command: %s", idx, cmd) - jmx_output = node.account.ssh_capture(cmd, allow_fail=False) - jmx_output.next() + cmd += " >> %s &" % self.jmx_tool_log + self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) + node.account.ssh(cmd, allow_fail=False) + wait_until(lambda: self._jmx_has_output(node), timeout_sec=5, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True + def _jmx_has_output(self, node): + """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output.""" + try: + node.account.ssh("test -z \"$(cat %s)\"" % self.jmx_tool_log, allow_fail=False) + return False + except RemoteCommandError: + return True + def read_jmx_output(self, idx, node): - if self.started[idx-1] == False: + if not self.started[idx-1]: return object_attribute_names = [] - cmd = "cat /mnt/jmx_tool.log" + cmd = "cat %s" % self.jmx_tool_log self.logger.debug("Read jmx output %d command: %s", idx, cmd) - for line in node.account.ssh_capture(cmd, allow_fail=False): + lines = [line for line in node.account.ssh_capture(cmd, allow_fail=False)] + assert len(lines) > 1, "There don't appear to be any samples in the jmx tool log: %s" % lines + + for line in lines: if "time" in line: object_attribute_names = line.strip()[1:-1].split("\",\"")[1:] continue stats = [float(field) for field in line.split(',')] time_sec = int(stats[0]/1000) - self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)} + self.jmx_stats[idx-1][time_sec] = {name: stats[i+1] for i, name in enumerate(object_attribute_names)} # do not calculate average and maximum of jmx stats until we have read output from all nodes + # If the service is multithreaded, this means that the results will be aggregated only when the last + # service finishes if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats): return diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 7a0ccdd5182..1113e0db65c 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -14,9 +14,9 @@ # limitations under the License. import os -import subprocess - +import time from ducktape.utils.util import wait_until +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME from kafkatest.services.monitor.jmx import JmxMixin @@ -118,7 +118,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError) as e: return [] def alive(self, node): @@ -136,6 +136,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): self.logger.debug("Producer performance %d command: %s", idx, cmd) # start ProducerPerformance process + start = time.time() producer_output = node.account.ssh_capture(cmd) wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start") # block until there is at least one line of output @@ -144,7 +145,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): raise Exception("No output from ProducerPerformance") self.start_jmx_tool(idx, node) - wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish") + wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish") + elapsed = time.time() - start + self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed) + self.read_jmx_output(idx, node) # parse producer output from file diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index b7d6b892a43..0af13f9b349 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -78,7 +78,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): def wait(self): for node in self.nodes: for pid in self.pids(node): - wait_until(lambda: not node.account.alive(pid), timeout_sec=600, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit") + wait_until(lambda: not node.account.alive(pid), timeout_sec=600, backoff_sec=1, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit") def clean_node(self, node): node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index 2f29d163109..a2753fdc3db 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -36,7 +36,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService): self.topic = topic self.report_interval_ms = report_interval_ms self.security_protocol = security_protocol - self.security_config = SecurityConfig(security_protocol) + self.security_config = SecurityConfig(self.context, security_protocol) self.partition_lag = {} self.stop_timeout_sec = stop_timeout_sec @@ -65,6 +65,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService): topic_partition = topic + ',' + str(partition) lag = self.partition_lag.get(topic_partition, -1) self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag)) + return lag def start_cmd(self, node): diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py index 3189ddc0aa2..b3cbeaef530 100644 --- a/tests/kafkatest/services/security/minikdc.py +++ b/tests/kafkatest/services/security/minikdc.py @@ -14,6 +14,7 @@ # limitations under the License. import os +import random import uuid from io import open from os import remove, close @@ -39,14 +40,48 @@ class MiniKdc(KafkaPathResolverMixin, Service): KEYTAB_FILE = "/mnt/minikdc/keytab" KRB5CONF_FILE = "/mnt/minikdc/krb5.conf" LOG_FILE = "/mnt/minikdc/minikdc.log" - LOCAL_KEYTAB_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_keytab" - LOCAL_KRB5CONF_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_krb5.conf" + + LOCAL_KEYTAB_FILE = None + LOCAL_KRB5CONF_FILE = None + + @staticmethod + def _set_local_keytab_file(local_scratch_dir): + """Set MiniKdc.LOCAL_KEYTAB_FILE exactly once per test. + + LOCAL_KEYTAB_FILE is currently used like a global variable to provide a mechanism to share the + location of the local keytab file among all services which might need it. + + Since individual ducktape tests are each run in a subprocess forked from the ducktape main process, + class variables set at class load time are duplicated between test processes. This leads to collisions + if test subprocesses are run in parallel, so we defer setting these class variables until after the test itself + begins to run. + """ + if MiniKdc.LOCAL_KEYTAB_FILE is None: + MiniKdc.LOCAL_KEYTAB_FILE = os.path.join(local_scratch_dir, "keytab") + return MiniKdc.LOCAL_KEYTAB_FILE + + @staticmethod + def _set_local_krb5conf_file(local_scratch_dir): + """Set MiniKdc.LOCAL_KRB5CONF_FILE exactly once per test. + + See _set_local_keytab_file for details why we do this. + """ + + if MiniKdc.LOCAL_KRB5CONF_FILE is None: + MiniKdc.LOCAL_KRB5CONF_FILE = os.path.join(local_scratch_dir, "krb5conf") + return MiniKdc.LOCAL_KRB5CONF_FILE def __init__(self, context, kafka_nodes, extra_principals=""): super(MiniKdc, self).__init__(context, 1) self.kafka_nodes = kafka_nodes self.extra_principals = extra_principals + # context.local_scratch_dir uses a ducktape feature: + # each test_context object has a unique local scratch directory which is available for the duration of the test + # which is automatically garbage collected after the test finishes + MiniKdc._set_local_keytab_file(context.local_scratch_dir) + MiniKdc._set_local_krb5conf_file(context.local_scratch_dir) + def replace_in_file(self, file_path, pattern, subst): fh, abs_path = mkstemp() with open(abs_path, 'w') as new_file: @@ -80,8 +115,8 @@ class MiniKdc(KafkaPathResolverMixin, Service): node.account.ssh(cmd) monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup") - node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE) - node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE) + node.account.copy_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE) + node.account.copy_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE) # KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname) diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 665c4b05410..9b29217bf72 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import atexit import os import subprocess from tempfile import mkdtemp @@ -22,21 +21,23 @@ from ducktape.template import TemplateRenderer from kafkatest.services.security.minikdc import MiniKdc import itertools + class SslStores(object): - def __init__(self): - self.ca_and_truststore_dir = mkdtemp(dir="/tmp") - self.ca_crt_path = os.path.join(self.ca_and_truststore_dir, "test.ca.crt") - self.ca_jks_path = os.path.join(self.ca_and_truststore_dir, "test.ca.jks") + def __init__(self, local_scratch_dir): + self.ca_crt_path = os.path.join(local_scratch_dir, "test.ca.crt") + self.ca_jks_path = os.path.join(local_scratch_dir, "test.ca.jks") self.ca_passwd = "test-ca-passwd" - self.truststore_path = os.path.join(self.ca_and_truststore_dir, "test.truststore.jks") + self.truststore_path = os.path.join(local_scratch_dir, "test.truststore.jks") self.truststore_passwd = "test-ts-passwd" self.keystore_passwd = "test-ks-passwd" self.key_passwd = "test-key-passwd" # Allow upto one hour of clock skew between host and VMs self.startdate = "-1H" - # Register rmtree to run on exit - atexit.register(rmtree, self.ca_and_truststore_dir) + + for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]: + if os.path.exists(file): + os.remove(file) def generate_ca(self): """ @@ -69,7 +70,7 @@ class SslStores(object): self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s -startdate %s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node), self.startdate)) self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path)) self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path)) - node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH) + node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH) rmtree(ks_dir) def hostname(self, node): @@ -79,9 +80,10 @@ class SslStores(object): def runcmd(self, cmd): proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - proc.communicate() + stdout, stderr = proc.communicate() + if proc.returncode != 0: - raise subprocess.CalledProcessError(proc.returncode, cmd) + raise RuntimeError("Command '%s' returned non-zero exit status %d: %s" % (cmd, proc.returncode, stdout)) class SecurityConfig(TemplateRenderer): @@ -99,11 +101,10 @@ class SecurityConfig(TemplateRenderer): KRB5CONF_PATH = "/mnt/security/krb5.conf" KEYTAB_PATH = "/mnt/security/keytab" - ssl_stores = SslStores() - ssl_stores.generate_ca() - ssl_stores.generate_truststore() + # This is initialized only when the first instance of SecurityConfig is created + ssl_stores = None - def __init__(self, security_protocol=None, interbroker_security_protocol=None, + def __init__(self, context, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, zk_sasl=False, template_props=""): """ @@ -114,6 +115,15 @@ class SecurityConfig(TemplateRenderer): template properties either, PLAINTEXT is used as default. """ + self.context = context + if not SecurityConfig.ssl_stores: + # This generates keystore/trustore files in a local scratch directory which gets + # automatically destroyed after the test is run + # Creating within the scratch directory allows us to run tests in parallel without fear of collision + SecurityConfig.ssl_stores = SslStores(context.local_scratch_dir) + SecurityConfig.ssl_stores.generate_ca() + SecurityConfig.ssl_stores.generate_truststore() + if security_protocol is None: security_protocol = self.get_property('security.protocol', template_props) if security_protocol is None: @@ -140,13 +150,12 @@ class SecurityConfig(TemplateRenderer): 'sasl.kerberos.service.name' : 'kafka' } - def client_config(self, template_props=""): - return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props) + return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props) def setup_ssl(self, node): node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False) - node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH) + node.account.copy_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH) SecurityConfig.ssl_stores.generate_and_copy_keystore(node) def setup_sasl(self, node): @@ -162,8 +171,8 @@ class SecurityConfig(TemplateRenderer): enabled_sasl_mechanisms=self.enabled_sasl_mechanisms) node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf) if self.has_sasl_kerberos: - node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH) - node.account.scp_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH) + node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH) + node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH) def setup_node(self, node): if self.has_ssl: diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 9c6abddaa60..c593e2a79bd 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -16,9 +16,9 @@ import json import os import signal -import subprocess from ducktape.services.background_thread import BackgroundThreadService +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import TopicPartition @@ -243,7 +243,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService): cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError) as e: return [] def try_parse_json(self, string): diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index dbdf71f36c2..205143e4118 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -16,19 +16,22 @@ import json import os import signal -import subprocess import time from ducktape.services.background_thread import BackgroundThreadService +from ducktape.cluster.remoteaccount import RemoteCommandError +from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME from kafkatest.utils import is_int, is_int_with_prefix from kafkatest.version import TRUNK, LATEST_0_8_2 +from kafkatest.utils.remote_account import line_count class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): PERSISTENT_ROOT = "/mnt/verifiable_producer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr") LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") @@ -38,6 +41,9 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): "verifiable_producer_stdout": { "path": STDOUT_CAPTURE, "collect_default": False}, + "verifiable_producer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, "verifiable_producer_log": { "path": LOG_FILE, "collect_default": True} @@ -114,37 +120,63 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): self.produced_count[idx] = 0 last_produced_time = time.time() prev_msg = None - for line in node.account.ssh_capture(cmd): - line = line.strip() + node.account.ssh(cmd) - data = self.try_parse_json(line) - if data is not None: + # Ensure that STDOUT_CAPTURE exists before try to read from it + # Note that if max_messages is configured, it's possible for the process to exit before this + # wait_until condition is checked + start = time.time() + wait_until(lambda: node.account.isfile(VerifiableProducer.STDOUT_CAPTURE) and + line_count(node, VerifiableProducer.STDOUT_CAPTURE) > 0, + timeout_sec=10, err_msg="%s: VerifiableProducer took too long to start" % node.account) + self.logger.debug("%s: VerifiableProducer took %s seconds to start" % (node.account, time.time() - start)) - with self.lock: - if data["name"] == "producer_send_error": - data["node"] = idx - self.not_acked_values.append(self.message_validator(data["value"])) - self.produced_count[idx] += 1 + with node.account.open(VerifiableProducer.STDOUT_CAPTURE, 'r') as f: + while True: + line = f.readline() + if line == '' and not self.alive(node): + # The process is gone, and we've reached the end of the output file, so we don't expect + # any more output to appear in the STDOUT_CAPTURE file + break - elif data["name"] == "producer_send_success": - self.acked_values.append(self.message_validator(data["value"])) - self.produced_count[idx] += 1 + line = line.strip() - # Log information if there is a large gap between successively acknowledged messages - t = time.time() - time_delta_sec = t - last_produced_time - if time_delta_sec > 2 and prev_msg is not None: - self.logger.debug( - "Time delta between successively acked messages is large: " + - "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data))) + data = self.try_parse_json(line) + if data is not None: - last_produced_time = t - prev_msg = data + with self.lock: + if data["name"] == "producer_send_error": + data["node"] = idx + self.not_acked_values.append(self.message_validator(data["value"])) + self.produced_count[idx] += 1 - elif data["name"] == "shutdown_complete": - if node in self.clean_shutdown_nodes: - raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx) - self.clean_shutdown_nodes.add(node) + elif data["name"] == "producer_send_success": + self.acked_values.append(self.message_validator(data["value"])) + self.produced_count[idx] += 1 + + # Log information if there is a large gap between successively acknowledged messages + t = time.time() + time_delta_sec = t - last_produced_time + if time_delta_sec > 2 and prev_msg is not None: + self.logger.debug( + "Time delta between successively acked messages is large: " + + "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data))) + + last_produced_time = t + prev_msg = data + + elif data["name"] == "shutdown_complete": + if node in self.clean_shutdown_nodes: + raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx) + self.clean_shutdown_nodes.add(node) + + def _has_output(self, node): + """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output.""" + try: + node.account.ssh("test -z \"$(cat %s)\"" % VerifiableProducer.STDOUT_CAPTURE, allow_fail=False) + return False + except RemoteCommandError: + return True def start_cmd(self, node, idx): cmd = "" @@ -171,10 +203,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): if self.message_validator == is_int_with_prefix: cmd += " --value-prefix %s" % str(idx) if self.acks is not None: - cmd += " --acks %s\n" % str(self.acks) + cmd += " --acks %s " % str(self.acks) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE - cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) + cmd += " 2>> %s 1>> %s &" % (VerifiableProducer.STDERR_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) return cmd def kill_node(self, node, clean_shutdown=True, allow_fail=False): @@ -190,7 +222,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError) as e: return [] def alive(self, node): diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index 201988937e1..8d38d485351 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -15,10 +15,11 @@ import re -import subprocess import time from ducktape.services.service import Service +from ducktape.utils.util import wait_until +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.security.security_config import SecurityConfig @@ -46,7 +47,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @property def security_config(self): - return SecurityConfig(zk_sasl=self.zk_sasl) + return SecurityConfig(self.context, zk_sasl=self.zk_sasl) @property def security_system_properties(self): @@ -85,7 +86,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except (subprocess.CalledProcessError, ValueError) as e: + except (RemoteCommandError, ValueError) as e: return [] def alive(self, node): @@ -95,6 +96,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): idx = self.idx(node) self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) node.account.kill_process("zookeeper", allow_fail=False) + wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.") def clean_node(self, node): self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname) diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py index 0de53aee54f..9301de4a1bb 100644 --- a/tests/kafkatest/tests/client/compression_test.py +++ b/tests/kafkatest/tests/client/compression_test.py @@ -15,6 +15,7 @@ from ducktape.mark import parametrize from ducktape.utils.util import wait_until +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -23,6 +24,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int_with_prefix + class CompressionTest(ProduceConsumeValidateTest): """ These tests validate produce / consume for compressed topics. @@ -51,6 +53,7 @@ class CompressionTest(ProduceConsumeValidateTest): # Override this since we're adding services outside of the constructor return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers + @cluster(num_nodes=7) @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True) @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False) def test_compressed_topic(self, compression_types, new_consumer): diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py index 3cd3c7c955c..e5904b1f153 100644 --- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.utils.util import wait_until +from ducktape.mark.resource import cluster + from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest from kafkatest.services.kafka import TopicPartition @@ -43,6 +44,7 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest): frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]), frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])]) + @cluster(num_nodes=4) def rolling_update_test(self): """ Verify rolling updates of partition assignment strategies works correctly. In this diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 534f65cbc3c..a68e23ec803 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -15,12 +15,14 @@ from ducktape.mark import matrix from ducktape.utils.util import wait_until +from ducktape.mark.resource import cluster from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest from kafkatest.services.kafka import TopicPartition import signal + class OffsetValidationTest(VerifiableConsumerTest): TOPIC = "test_topic" NUM_PARTITIONS = 1 @@ -72,6 +74,7 @@ class OffsetValidationTest(VerifiableConsumerTest): self.mark_for_collect(consumer, 'verifiable_consumer_stdout') return consumer + @cluster(num_nodes=7) def test_broker_rolling_bounce(self): """ Verify correct consumer behavior when the brokers are consecutively restarted. @@ -112,6 +115,7 @@ class OffsetValidationTest(VerifiableConsumerTest): assert consumer.current_position(partition) == consumer.total_consumed(), \ "Total consumed records did not match consumed position" + @cluster(num_nodes=7) @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"]) def test_consumer_bounce(self, clean_shutdown, bounce_mode): """ @@ -152,6 +156,7 @@ class OffsetValidationTest(VerifiableConsumerTest): assert consumer.current_position(partition) <= consumer.total_consumed(), \ "Current position greater than the total number of consumed records" + @cluster(num_nodes=7) @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False]) def test_consumer_failure(self, clean_shutdown, enable_autocommit): partition = TopicPartition(self.TOPIC, 0) @@ -194,7 +199,7 @@ class OffsetValidationTest(VerifiableConsumerTest): assert consumer.last_commit(partition) == consumer.current_position(partition), \ "Last committed offset did not match last consumed position" - + @cluster(num_nodes=7) @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False]) def test_broker_failure(self, clean_shutdown, enable_autocommit): partition = TopicPartition(self.TOPIC, 0) @@ -229,6 +234,7 @@ class OffsetValidationTest(VerifiableConsumerTest): assert consumer.last_commit(partition) == consumer.current_position(partition), \ "Last committed offset did not match last consumed position" + @cluster(num_nodes=7) def test_group_consumption(self): """ Verifies correct group rebalance behavior as consumers are started and stopped. @@ -277,6 +283,7 @@ class AssignmentValidationTest(VerifiableConsumerTest): self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }, }) + @cluster(num_nodes=6) @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor"]) def test_valid_assignment(self, assignment_strategy): @@ -294,4 +301,4 @@ class AssignmentValidationTest(VerifiableConsumerTest): consumer.start_node(node) self.await_members(consumer, num_started) assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()) - + diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py index a57c04b4721..edcead2abf0 100644 --- a/tests/kafkatest/tests/client/message_format_change_test.py +++ b/tests/kafkatest/tests/client/message_format_change_test.py @@ -14,6 +14,7 @@ from ducktape.mark import parametrize from ducktape.utils.util import wait_until +from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService @@ -55,7 +56,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest): lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, timeout_sec=120, backoff_sec=1, err_msg="Producer did not produce all messages in reasonable amount of time")) - + + @cluster(num_nodes=10) @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK)) @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9)) def test_compatibility(self, producer_version, consumer_version): diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 1d31569c07f..baed8373a0a 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -15,6 +15,7 @@ from ducktape.tests.test import Test from ducktape.mark import matrix, parametrize +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -124,6 +125,7 @@ class QuotaTest(Test): """Override this since we're adding services outside of the constructor""" return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers + @cluster(num_nodes=5) @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False]) @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2) def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1): diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index ee0a222d4d0..f49bb5d7538 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -14,18 +14,22 @@ # limitations under the License. from ducktape.tests.test import Test +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until +from ducktape.mark import matrix, parametrize +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig -from ducktape.utils.util import wait_until -from ducktape.mark import matrix -import subprocess, itertools, time + +import itertools, time from collections import Counter, namedtuple import operator + class ConnectDistributedTest(Test): """ Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on @@ -139,6 +143,7 @@ class ConnectDistributedTest(Test): status = self._connector_status(connector.name, node) return self._task_has_state(task_id, status, 'RUNNING') + @cluster(num_nodes=5) def test_restart_failed_connector(self): self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -155,6 +160,7 @@ class ConnectDistributedTest(Test): wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING state") + @cluster(num_nodes=5) @matrix(connector_type=["source", "sink"]) def test_restart_failed_task(self, connector_type): self.setup_services() @@ -178,7 +184,7 @@ class ConnectDistributedTest(Test): wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10, err_msg="Failed to see task transition to the RUNNING state") - + @cluster(num_nodes=5) def test_pause_and_resume_source(self): """ Verify that source connectors stop producing records when paused and begin again after @@ -217,6 +223,7 @@ class ConnectDistributedTest(Test): wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30, err_msg="Failed to produce messages after resuming source connector") + @cluster(num_nodes=5) def test_pause_and_resume_sink(self): """ Verify that sink connectors stop consuming records when paused and begin again after @@ -259,7 +266,7 @@ class ConnectDistributedTest(Test): wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30, err_msg="Failed to consume messages after resuming source connector") - + @cluster(num_nodes=5) def test_pause_state_persistent(self): """ Verify that paused state is preserved after a cluster restart. @@ -284,7 +291,10 @@ class ConnectDistributedTest(Test): wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30, err_msg="Failed to see connector startup in PAUSED state") - @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL]) + @cluster(num_nodes=5) + @parametrize(security_protocol=SecurityConfig.PLAINTEXT) + @cluster(num_nodes=6) + @parametrize(security_protocol=SecurityConfig.SASL_SSL) def test_file_source_and_sink(self, security_protocol): """ Tests that a basic file connector works across clean rolling bounces. This validates that the connector is @@ -315,7 +325,7 @@ class ConnectDistributedTest(Test): node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE) wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file") - + @cluster(num_nodes=5) @matrix(clean=[True, False]) def test_bounce(self, clean): """ @@ -424,8 +434,6 @@ class ConnectDistributedTest(Test): assert success, "Found validation errors:\n" + "\n ".join(errors) - - def _validate_file_output(self, input): input_set = set(input) # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled. @@ -437,8 +445,8 @@ class ConnectDistributedTest(Test): def _file_contents(self, node, file): try: - # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of + # Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of # immediately return list(node.account.ssh_capture("cat " + file)) - except subprocess.CalledProcessError: + except RemoteCommandError: return [] diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 70bc32c9d5b..098790b4c53 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -16,7 +16,9 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.connect import ConnectDistributedService, ConnectRestError from ducktape.utils.util import wait_until -import subprocess +from ducktape.mark.resource import cluster +from ducktape.cluster.remoteaccount import RemoteCommandError + import json import itertools @@ -57,6 +59,7 @@ class ConnectRestApiTest(KafkaTest): self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE]) + @cluster(num_nodes=4) def test_rest_api(self): # Template parameters self.key_converter = "org.apache.kafka.connect.json.JsonConverter" @@ -171,10 +174,10 @@ class ConnectRestApiTest(KafkaTest): def file_contents(self, node, file): try: - # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of + # Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of # immediately return list(node.account.ssh_capture("cat " + file)) - except subprocess.CalledProcessError: + except RemoteCommandError: return [] def _config_dict_from_props(self, connector_props): diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 83acb4a9196..9436119f886 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -14,15 +14,20 @@ # limitations under the License. from ducktape.tests.test import Test +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize, matrix +from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.connect import ConnectStandaloneService from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig -from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix -import hashlib, subprocess, json + +import hashlib +import json + class ConnectStandaloneFileTest(Test): """ @@ -58,10 +63,13 @@ class ConnectStandaloneFileTest(Test): self.zk = ZookeeperService(test_context, self.num_zk) + @cluster(num_nodes=5) @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True) @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False) @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None) - @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL]) + @parametrize(security_protocol=SecurityConfig.PLAINTEXT) + @cluster(num_nodes=6) + @parametrize(security_protocol=SecurityConfig.SASL_SSL) def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'): """ Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes @@ -85,7 +93,6 @@ class ConnectStandaloneFileTest(Test): self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000) - self.zk.start() self.kafka.start() @@ -118,5 +125,5 @@ class ConnectStandaloneFileTest(Test): try: output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0] return output_hash == hashlib.md5(value).hexdigest() - except subprocess.CalledProcessError: + except RemoteCommandError: return False diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index d6a0a12bcd7..f3931ec9864 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -14,6 +14,7 @@ from ducktape.mark import parametrize from ducktape.utils.util import wait_until +from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService @@ -43,6 +44,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): self.num_consumers = 1 self.messages_per_producer = 1000 + @cluster(num_nodes=6) @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None) @@ -54,7 +56,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime")) def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None): - + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { "partitions": 3, "replication-factor": 3, diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py index c3f59d9b329..c03022a6305 100644 --- a/tests/kafkatest/tests/core/consumer_group_command_test.py +++ b/tests/kafkatest/tests/core/consumer_group_command_test.py @@ -17,6 +17,7 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test from ducktape.mark import matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -28,6 +29,7 @@ import re TOPIC = "topic-consumer-group-command" + class ConsumerGroupCommandTest(Test): """ Tests ConsumerGroupCommand @@ -89,6 +91,7 @@ class ConsumerGroupCommandTest(Test): self.consumer.stop() + @cluster(num_nodes=3) @matrix(security_protocol=['PLAINTEXT', 'SSL']) def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): """ @@ -97,6 +100,7 @@ class ConsumerGroupCommandTest(Test): """ self.setup_and_verify(security_protocol) + @cluster(num_nodes=3) @matrix(security_protocol=['PLAINTEXT', 'SSL']) def test_describe_consumer_group(self, security_protocol='PLAINTEXT'): """ diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py index 38bd9dc46d8..e45365d32cf 100644 --- a/tests/kafkatest/tests/core/get_offset_shell_test.py +++ b/tests/kafkatest/tests/core/get_offset_shell_test.py @@ -16,8 +16,9 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test -from kafkatest.services.verifiable_producer import VerifiableProducer +from ducktape.mark.resource import cluster +from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer @@ -28,6 +29,7 @@ MAX_MESSAGES = 100 NUM_PARTITIONS = 1 REPLICATION_FACTOR = 1 + class GetOffsetShellTest(Test): """ Tests GetOffsetShell tool @@ -44,7 +46,6 @@ class GetOffsetShellTest(Test): self.zk = ZookeeperService(test_context, self.num_zk) - def setUp(self): self.zk.start() @@ -69,6 +70,7 @@ class GetOffsetShellTest(Test): consumer_timeout_ms=1000, new_consumer=enable_new_consumer) self.consumer.start() + @cluster(num_nodes=4) def test_get_offset_shell(self, security_protocol='PLAINTEXT'): """ Tests if GetOffsetShell is getting offsets correctly diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py index afb1972eb15..ce86a60cc11 100644 --- a/tests/kafkatest/tests/core/mirror_maker_test.py +++ b/tests/kafkatest/tests/core/mirror_maker_test.py @@ -14,7 +14,8 @@ # limitations under the License. from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix, ignore +from ducktape.mark import parametrize, matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -110,8 +111,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10, err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages) + @cluster(num_nodes=7) @parametrize(security_protocol='PLAINTEXT', new_consumer=False) - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True]) + @matrix(security_protocol=['PLAINTEXT', 'SSL'], new_consumer=[True]) + @cluster(num_nodes=8) + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True]) def test_simple_end_to_end(self, security_protocol, new_consumer): """ Test end-to-end behavior under non-failure conditions. @@ -140,8 +144,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages) self.mirror_maker.stop() + @cluster(num_nodes=7) @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False]) - @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) + @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL']) + @cluster(num_nodes=8) + @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'): """ Test end-to-end behavior under failure conditions. diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index 850e2aae277..fef57d1e5b5 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.zookeeper import ZookeeperService @@ -24,6 +25,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int import random + class ReassignPartitionsTest(ProduceConsumeValidateTest): """ These tests validate partition reassignment. @@ -86,6 +88,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest): # Wait until finished or timeout wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5) + @cluster(num_nodes=7) @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True) @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False) def test_reassign_partitions(self, bounce_brokers, security_protocol): diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index f8150341c49..a95e9e5c289 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -16,6 +16,7 @@ from ducktape.utils.util import wait_until from ducktape.mark import matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -118,7 +119,7 @@ class ReplicationTest(ProduceConsumeValidateTest): """Override this since we're adding services outside of the constructor""" return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers - + @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]) @@ -146,7 +147,7 @@ class ReplicationTest(ProduceConsumeValidateTest): self.kafka.interbroker_security_protocol = security_protocol self.kafka.client_sasl_mechanism = client_sasl_mechanism self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism - new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True + new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) self.kafka.start() diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py index 51b2e609e82..a21e845649d 100644 --- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py @@ -20,8 +20,8 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.utils import is_int from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from ducktape.mark import parametrize -from ducktape.mark import matrix +from ducktape.mark import parametrize, matrix +from ducktape.mark.resource import cluster from kafkatest.services.security.kafka_acls import ACLs import time @@ -102,7 +102,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): # Bounce again with ACLs for new mechanism self.set_authorizer_and_bounce(security_protocol, security_protocol) - @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"]) + @cluster(num_nodes=8) + @matrix(client_protocol=["SSL"]) + @cluster(num_nodes=9) + @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"]) def test_rolling_upgrade_phase_one(self, client_protocol): """ Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce @@ -123,6 +126,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): self.create_producer_and_consumer() self.run_produce_consume_validate(lambda: time.sleep(1)) + @cluster(num_nodes=8) @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"]) def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol): """ @@ -143,6 +147,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol) + @cluster(num_nodes=9) @parametrize(new_client_sasl_mechanism='PLAIN') def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism): """ @@ -166,6 +171,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): self.create_producer_and_consumer() self.run_produce_consume_validate(lambda: time.sleep(1)) + @cluster(num_nodes=8) @parametrize(new_sasl_mechanism='PLAIN') def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism): """ diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index b6bc656effc..4edbcff8937 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -13,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.mark import parametrize +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until +from ducktape.errors import TimeoutError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -23,20 +27,19 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.security.security_config import SslStores from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -import time class TestSslStores(SslStores): - def __init__(self): - super(TestSslStores, self).__init__() - self.invalid_hostname = False + def __init__(self, local_scratch_dir, valid_hostname=True): + super(TestSslStores, self).__init__(local_scratch_dir) + self.valid_hostname = valid_hostname self.generate_ca() self.generate_truststore() def hostname(self, node): - if (self.invalid_hostname): - return "invalidhost" - else: + if self.valid_hostname: return super(TestSslStores, self).hostname(node) + else: + return "invalidhostname" class SecurityTest(ProduceConsumeValidateTest): """ @@ -62,6 +65,18 @@ class SecurityTest(ProduceConsumeValidateTest): def setUp(self): self.zk.start() + def producer_consumer_have_expected_error(self, error): + try: + for node in self.producer.nodes: + node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE)) + for node in self.consumer.nodes: + node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE)) + except RemoteCommandError: + return False + + return True + + @cluster(num_nodes=7) @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL') @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol): @@ -74,29 +89,35 @@ class SecurityTest(ProduceConsumeValidateTest): self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = interbroker_security_protocol - SecurityConfig.ssl_stores = TestSslStores() + SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False) - SecurityConfig.ssl_stores.invalid_hostname = True self.kafka.start() self.create_producer_and_consumer() self.producer.log_level = "TRACE" + self.producer.start() self.consumer.start() - time.sleep(10) - assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname" - error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE' - for node in self.producer.nodes: - node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE)) - for node in self.consumer.nodes: - node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE)) + try: + wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5) + + # Fail quickly if messages are successfully acked + raise RuntimeError("Messages published successfully but should not have!" + " Endpoint validation did not fail with invalid hostname") + except TimeoutError: + # expected + pass + + error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE' + wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5) self.producer.stop() self.consumer.stop() self.producer.log_level = "INFO" - SecurityConfig.ssl_stores.invalid_hostname = False + SecurityConfig.ssl_stores.valid_hostname = True for node in self.kafka.nodes: self.kafka.restart_node(node, clean_shutdown=True) + self.create_producer_and_consumer() self.run_produce_consume_validate() diff --git a/tests/kafkatest/tests/core/simple_consumer_shell_test.py b/tests/kafkatest/tests/core/simple_consumer_shell_test.py index 74a7eeb911e..882aae7fba6 100644 --- a/tests/kafkatest/tests/core/simple_consumer_shell_test.py +++ b/tests/kafkatest/tests/core/simple_consumer_shell_test.py @@ -16,6 +16,8 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test +from ducktape.mark.resource import cluster + from kafkatest.services.simple_consumer_shell import SimpleConsumerShell from kafkatest.services.verifiable_producer import VerifiableProducer @@ -26,6 +28,7 @@ MAX_MESSAGES = 100 NUM_PARTITIONS = 1 REPLICATION_FACTOR = 1 + class SimpleConsumerShellTest(Test): """ Tests SimpleConsumerShell tool @@ -61,6 +64,7 @@ class SimpleConsumerShellTest(Test): self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC) self.simple_consumer_shell.start() + @cluster(num_nodes=4) def test_simple_consumer_shell(self): """ Tests if SimpleConsumerShell is fetching expected records diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index 2e21322e6f9..9684099ac3c 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -16,6 +16,7 @@ import time import math from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.performance import ProducerPerformanceService @@ -137,6 +138,7 @@ class ThrottlingTest(ProduceConsumeValidateTest): estimated_throttled_time, time_taken)) + @cluster(num_nodes=10) @parametrize(bounce_brokers=False) @parametrize(bounce_brokers=True) def test_throttled_reassignment(self, bounce_brokers): diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 15a9696ad6e..34af4eb56c7 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import parametrize +from ducktape.mark.resource import cluster import json @@ -60,10 +61,13 @@ class TestUpgrade(ProduceConsumeValidateTest): node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version self.kafka.start_node(node) + @cluster(num_nodes=6) @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False) + @cluster(num_nodes=7) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL") + @cluster(num_nodes=6) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"]) @@ -71,6 +75,7 @@ class TestUpgrade(ProduceConsumeValidateTest): @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"]) + @cluster(num_nodes=7) @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False) def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py index 0cfdf16040b..f8b2146905e 100644 --- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py +++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -92,7 +93,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): self.kafka.stop_node(node) self.kafka.start_node(node) - @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"]) + @cluster(num_nodes=9) + @matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"]) def test_zk_security_upgrade(self, security_protocol): self.zk.start() self.kafka.security_protocol = security_protocol @@ -103,7 +105,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group) - if(self.no_sasl): + if self.no_sasl: self.kafka.start() else: self.kafka.start(self.zk.zk_principals) diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py index d6746419462..169bbc1aa02 100644 --- a/tests/kafkatest/tests/streams/streams_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_bounce_test.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import ignore +from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService import time + class StreamsBounceTest(KafkaTest): """ Simple test of Kafka Streams. @@ -41,6 +42,7 @@ class StreamsBounceTest(KafkaTest): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + @cluster(num_nodes=5) def test_bounce(self): """ Start a smoke test client, then abort (kill -9) and restart it a few times. @@ -51,11 +53,11 @@ class StreamsBounceTest(KafkaTest): self.processor1.start() - time.sleep(15); + time.sleep(15) self.processor1.abortThenRestart() - time.sleep(15); + time.sleep(15) # enable this after we add change log partition replicas #self.kafka.signal_leader("data") diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index ea05c5f4cd0..bc8487837ee 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import ignore + +from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService import time + class StreamsSmokeTest(KafkaTest): """ Simple test of Kafka Streams. @@ -45,6 +47,7 @@ class StreamsSmokeTest(KafkaTest): self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) @ignore + @cluster(num_nodes=7) def test_streams(self): """ Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times. @@ -56,14 +59,14 @@ class StreamsSmokeTest(KafkaTest): self.processor1.start() self.processor2.start() - time.sleep(15); + time.sleep(15) self.processor3.start() self.processor1.stop() - time.sleep(15); + time.sleep(15) - self.processor4.start(); + self.processor4.start() self.driver.wait() self.driver.stop() diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py index 42cfeeabddb..7e0b9eeccec 100644 --- a/tests/kafkatest/tests/tools/log4j_appender_test.py +++ b/tests/kafkatest/tests/tools/log4j_appender_test.py @@ -17,6 +17,7 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test from ducktape.mark import matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -27,6 +28,7 @@ from kafkatest.services.security.security_config import SecurityConfig TOPIC = "topic-log4j-appender" MAX_MESSAGES = 100 + class Log4jAppenderTest(Test): """ Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic @@ -62,7 +64,6 @@ class Log4jAppenderTest(Test): self.logger.debug("Received message: %s" % msg) self.messages_received_count += 1 - def start_consumer(self, security_protocol): enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, @@ -70,7 +71,10 @@ class Log4jAppenderTest(Test): message_validator=self.custom_message_validator) self.consumer.start() - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) + @cluster(num_nodes=4) + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + @cluster(num_nodes=5) + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) def test_log4j_appender(self, security_protocol='PLAINTEXT'): """ Tests if KafkaLog4jAppender is producing to Kafka topic diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py index 1b625e94db1..a5b33905b9b 100644 --- a/tests/kafkatest/tests/tools/replica_verification_test.py +++ b/tests/kafkatest/tests/tools/replica_verification_test.py @@ -16,8 +16,9 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test -from kafkatest.services.verifiable_producer import VerifiableProducer +from ducktape.mark.resource import cluster +from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.replica_verification_tool import ReplicaVerificationTool @@ -59,9 +60,8 @@ class ReplicaVerificationToolTest(Test): def start_producer(self, max_messages, acks, timeout): # This will produce to kafka cluster + current_acked = 0 self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages) - current_acked = self.producer.num_acked - self.logger.info("current_acked = %s" % current_acked) self.producer.start() wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout, err_msg="Timeout awaiting messages to be produced and acked") @@ -69,6 +69,7 @@ class ReplicaVerificationToolTest(Test): def stop_producer(self): self.producer.stop() + @cluster(num_nodes=6) def test_replica_lags(self, security_protocol='PLAINTEXT'): """ Tests ReplicaVerificationTool @@ -77,6 +78,7 @@ class ReplicaVerificationToolTest(Test): self.start_kafka(security_protocol, security_protocol) self.start_replica_verification_tool(security_protocol) self.start_producer(max_messages=10, acks=-1, timeout=15) + # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, err_msg="Timed out waiting to reach zero replica lags.") diff --git a/tests/setup.py b/tests/setup.py index cae0a3f7935..e43a4abf075 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -50,7 +50,7 @@ setup(name="kafkatest", license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.5.3", "requests>=2.5.0"], + install_requires=["ducktape==0.6.0", "requests>=2.5.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, )