KAFKA-4140: Upgrade to ducktape 0.6.0 and make system tests parallel friendly

Updates to take advantage of soon-to-be-released ducktape features.

Author: Geoff Anderson <geoff@confluent.io>
Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1834 from granders/systest-parallel-friendly
This commit is contained in:
Geoff Anderson 2016-12-11 18:43:23 -08:00 committed by Ewen Cheslack-Postava
parent 6f7ed15dad
commit 62e043a865
45 changed files with 394 additions and 161 deletions

View File

@ -15,6 +15,7 @@
from ducktape.mark import matrix
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.services.service import Service
from ducktape.tests.test import Test
@ -63,11 +64,13 @@ class Benchmark(Test):
self.kafka.log_level = "INFO" # We don't DEBUG logging here
self.kafka.start()
@cluster(num_nodes=5)
@parametrize(acks=1, topic=TOPIC_REP_ONE)
@parametrize(acks=1, topic=TOPIC_REP_THREE)
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL'])
@cluster(num_nodes=7)
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK),
broker_version=str(TRUNK)):
@ -97,6 +100,7 @@ class Benchmark(Test):
self.producer.run()
return compute_aggregate_throughput(self.producer)
@cluster(num_nodes=5)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
@ -152,8 +156,11 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
@cluster(num_nodes=5)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
@cluster(num_nodes=6)
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
interbroker_security_protocol=None, client_version=str(TRUNK),
broker_version=str(TRUNK)):
@ -181,6 +188,7 @@ class Benchmark(Test):
self.perf.run()
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
@cluster(num_nodes=6)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
@ -229,6 +237,7 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
@cluster(num_nodes=6)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])

View File

@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import ignore
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
import time
class StreamsSimpleBenchmarkTest(KafkaTest):
"""
@ -29,6 +29,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest):
self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
@cluster(num_nodes=3)
def test_simple_benchmark(self):
"""
Run simple Kafka Streams benchmark

View File

@ -17,6 +17,7 @@ import time
from ducktape.mark import matrix
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
@ -42,9 +43,12 @@ class ConsoleConsumerTest(Test):
def setUp(self):
self.zk.start()
@cluster(num_nodes=3)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
@cluster(num_nodes=4)
@parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'):
"""Check that console consumer starts/stops properly, and that we are capturing log output."""
@ -66,14 +70,16 @@ class ConsoleConsumerTest(Test):
# Verify that log output is happening
wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10,
err_msg="Timed out waiting for logging to start.")
assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
err_msg="Timed out waiting for consumer log file to exist.")
wait_until(lambda: line_count(node, ConsoleConsumer.LOG_FILE) > 0, timeout_sec=1,
backoff_sec=.25, err_msg="Timed out waiting for log entries to start.")
# Verify no consumed messages
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node)
@cluster(num_nodes=4)
def test_version(self):
"""Check that console consumer v0.8.2.X successfully starts and consumes messages."""
self.kafka.start()

View File

@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from kafkatest.services.kafka import KafkaService, config_property
from kafkatest.services.zookeeper import ZookeeperService
@ -32,6 +33,7 @@ class KafkaVersionTest(Test):
def setUp(self):
self.zk.start()
@cluster(num_nodes=2)
def test_0_8_2(self):
"""Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster."""
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
@ -42,6 +44,7 @@ class KafkaVersionTest(Test):
assert is_version(node, [LATEST_0_8_2])
@cluster(num_nodes=3)
def test_multi_version(self):
"""Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
the other on trunk."""

View File

@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService
@ -35,6 +36,7 @@ class PerformanceServiceTest(Test):
def setUp(self):
self.zk.start()
@cluster(num_nodes=5)
# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
# the overhead should be manageable.
@parametrize(version=str(LATEST_0_8_2), new_consumer=False)

View File

@ -15,6 +15,7 @@
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
@ -44,6 +45,7 @@ class TestVerifiableProducer(Test):
self.zk.start()
self.kafka.start()
@cluster(num_nodes=3)
@parametrize(producer_version=str(LATEST_0_8_2))
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(TRUNK))

View File

@ -304,7 +304,8 @@ class VerifiableConnector(object):
self.logger.debug("Ignoring unparseable line: %s", line)
continue
# Filter to only ones matching our name to support multiple verifiable producers
if data['name'] != self.name: continue
if data['name'] != self.name:
continue
data['node'] = node
records.append(data)
return records

View File

@ -15,10 +15,9 @@
import itertools
import os
import subprocess
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
@ -211,7 +210,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):

View File

@ -18,11 +18,11 @@ import json
import os.path
import re
import signal
import subprocess
import time
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from config import KafkaConfig
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@ -121,8 +121,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@property
def security_config(self):
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol,
zk_sasl = self.zk.zk_sasl,
return SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
zk_sasl=self.zk.zk_sasl,
client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
def open_port(self, protocol):
@ -208,7 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
node.account.ssh(cmd)
monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
monitor.wait_until("Kafka Server.*started", timeout_sec=30, backoff_sec=.25, err_msg="Kafka server didn't finish startup")
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
@ -221,7 +221,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError) as e:
return []
def signal_node(self, node, sig=signal.SIGTERM):

View File

@ -34,7 +34,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
self.topic = topic
self.max_messages = max_messages
self.security_protocol = security_protocol
self.security_config = SecurityConfig(security_protocol)
self.security_config = SecurityConfig(self.context, security_protocol)
self.stop_timeout_sec = 30
def _worker(self, idx, node):
cmd = self.start_cmd(node)

View File

@ -14,10 +14,10 @@
# limitations under the License.
import os
import subprocess
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@ -145,7 +145,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError):
return []
def alive(self, node):

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
@ -31,12 +34,19 @@ class JmxMixin(object):
self.maximum_jmx_value = {} # map from object_attribute_name to maximum value observed over time
self.average_jmx_value = {} # map from object_attribute_name to average value observed over time
self.jmx_tool_log = "/mnt/jmx_tool.log"
def clean_node(self, node):
node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
node.account.ssh("rm -rf %s" % self.jmx_tool_log, allow_fail=False)
def start_jmx_tool(self, idx, node):
if self.started[idx-1] or self.jmx_object_names is None:
if self.jmx_object_names is None:
self.logger.debug("%s: Not starting jmx tool because no jmx objects are defined" % node.account)
return
if self.started[idx-1]:
self.logger.debug("%s: jmx tool has been started already on this node" % node.account)
return
cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
@ -45,31 +55,43 @@ class JmxMixin(object):
cmd += " --object-name %s" % jmx_object_name
for jmx_attribute in self.jmx_attributes:
cmd += " --attributes %s" % jmx_attribute
cmd += " | tee -a /mnt/jmx_tool.log"
self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
jmx_output.next()
cmd += " >> %s &" % self.jmx_tool_log
self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self._jmx_has_output(node), timeout_sec=5, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
self.started[idx-1] = True
def _jmx_has_output(self, node):
"""Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output."""
try:
node.account.ssh("test -z \"$(cat %s)\"" % self.jmx_tool_log, allow_fail=False)
return False
except RemoteCommandError:
return True
def read_jmx_output(self, idx, node):
if self.started[idx-1] == False:
if not self.started[idx-1]:
return
object_attribute_names = []
cmd = "cat /mnt/jmx_tool.log"
cmd = "cat %s" % self.jmx_tool_log
self.logger.debug("Read jmx output %d command: %s", idx, cmd)
for line in node.account.ssh_capture(cmd, allow_fail=False):
lines = [line for line in node.account.ssh_capture(cmd, allow_fail=False)]
assert len(lines) > 1, "There don't appear to be any samples in the jmx tool log: %s" % lines
for line in lines:
if "time" in line:
object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
continue
stats = [float(field) for field in line.split(',')]
time_sec = int(stats[0]/1000)
self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
self.jmx_stats[idx-1][time_sec] = {name: stats[i+1] for i, name in enumerate(object_attribute_names)}
# do not calculate average and maximum of jmx stats until we have read output from all nodes
# If the service is multithreaded, this means that the results will be aggregated only when the last
# service finishes
if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
return

View File

@ -14,9 +14,9 @@
# limitations under the License.
import os
import subprocess
import time
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.services.monitor.jmx import JmxMixin
@ -118,7 +118,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):
@ -136,6 +136,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
self.logger.debug("Producer performance %d command: %s", idx, cmd)
# start ProducerPerformance process
start = time.time()
producer_output = node.account.ssh_capture(cmd)
wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start")
# block until there is at least one line of output
@ -144,7 +145,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
raise Exception("No output from ProducerPerformance")
self.start_jmx_tool(idx, node)
wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish")
wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish")
elapsed = time.time() - start
self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed)
self.read_jmx_output(idx, node)
# parse producer output from file

View File

@ -78,7 +78,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
def wait(self):
for node in self.nodes:
for pid in self.pids(node):
wait_until(lambda: not node.account.alive(pid), timeout_sec=600, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
wait_until(lambda: not node.account.alive(pid), timeout_sec=600, backoff_sec=1, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)

View File

@ -36,7 +36,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
self.topic = topic
self.report_interval_ms = report_interval_ms
self.security_protocol = security_protocol
self.security_config = SecurityConfig(security_protocol)
self.security_config = SecurityConfig(self.context, security_protocol)
self.partition_lag = {}
self.stop_timeout_sec = stop_timeout_sec
@ -65,6 +65,7 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
topic_partition = topic + ',' + str(partition)
lag = self.partition_lag.get(topic_partition, -1)
self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag))
return lag
def start_cmd(self, node):

View File

@ -14,6 +14,7 @@
# limitations under the License.
import os
import random
import uuid
from io import open
from os import remove, close
@ -39,14 +40,48 @@ class MiniKdc(KafkaPathResolverMixin, Service):
KEYTAB_FILE = "/mnt/minikdc/keytab"
KRB5CONF_FILE = "/mnt/minikdc/krb5.conf"
LOG_FILE = "/mnt/minikdc/minikdc.log"
LOCAL_KEYTAB_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_keytab"
LOCAL_KRB5CONF_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_krb5.conf"
LOCAL_KEYTAB_FILE = None
LOCAL_KRB5CONF_FILE = None
@staticmethod
def _set_local_keytab_file(local_scratch_dir):
"""Set MiniKdc.LOCAL_KEYTAB_FILE exactly once per test.
LOCAL_KEYTAB_FILE is currently used like a global variable to provide a mechanism to share the
location of the local keytab file among all services which might need it.
Since individual ducktape tests are each run in a subprocess forked from the ducktape main process,
class variables set at class load time are duplicated between test processes. This leads to collisions
if test subprocesses are run in parallel, so we defer setting these class variables until after the test itself
begins to run.
"""
if MiniKdc.LOCAL_KEYTAB_FILE is None:
MiniKdc.LOCAL_KEYTAB_FILE = os.path.join(local_scratch_dir, "keytab")
return MiniKdc.LOCAL_KEYTAB_FILE
@staticmethod
def _set_local_krb5conf_file(local_scratch_dir):
"""Set MiniKdc.LOCAL_KRB5CONF_FILE exactly once per test.
See _set_local_keytab_file for details why we do this.
"""
if MiniKdc.LOCAL_KRB5CONF_FILE is None:
MiniKdc.LOCAL_KRB5CONF_FILE = os.path.join(local_scratch_dir, "krb5conf")
return MiniKdc.LOCAL_KRB5CONF_FILE
def __init__(self, context, kafka_nodes, extra_principals=""):
super(MiniKdc, self).__init__(context, 1)
self.kafka_nodes = kafka_nodes
self.extra_principals = extra_principals
# context.local_scratch_dir uses a ducktape feature:
# each test_context object has a unique local scratch directory which is available for the duration of the test
# which is automatically garbage collected after the test finishes
MiniKdc._set_local_keytab_file(context.local_scratch_dir)
MiniKdc._set_local_krb5conf_file(context.local_scratch_dir)
def replace_in_file(self, file_path, pattern, subst):
fh, abs_path = mkstemp()
with open(abs_path, 'w') as new_file:
@ -80,8 +115,8 @@ class MiniKdc(KafkaPathResolverMixin, Service):
node.account.ssh(cmd)
monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup")
node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
node.account.copy_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
node.account.copy_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
# KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname)

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import os
import subprocess
from tempfile import mkdtemp
@ -22,21 +21,23 @@ from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
import itertools
class SslStores(object):
def __init__(self):
self.ca_and_truststore_dir = mkdtemp(dir="/tmp")
self.ca_crt_path = os.path.join(self.ca_and_truststore_dir, "test.ca.crt")
self.ca_jks_path = os.path.join(self.ca_and_truststore_dir, "test.ca.jks")
def __init__(self, local_scratch_dir):
self.ca_crt_path = os.path.join(local_scratch_dir, "test.ca.crt")
self.ca_jks_path = os.path.join(local_scratch_dir, "test.ca.jks")
self.ca_passwd = "test-ca-passwd"
self.truststore_path = os.path.join(self.ca_and_truststore_dir, "test.truststore.jks")
self.truststore_path = os.path.join(local_scratch_dir, "test.truststore.jks")
self.truststore_passwd = "test-ts-passwd"
self.keystore_passwd = "test-ks-passwd"
self.key_passwd = "test-key-passwd"
# Allow upto one hour of clock skew between host and VMs
self.startdate = "-1H"
# Register rmtree to run on exit
atexit.register(rmtree, self.ca_and_truststore_dir)
for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]:
if os.path.exists(file):
os.remove(file)
def generate_ca(self):
"""
@ -69,7 +70,7 @@ class SslStores(object):
self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s -startdate %s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node), self.startdate))
self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH)
node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH)
rmtree(ks_dir)
def hostname(self, node):
@ -79,9 +80,10 @@ class SslStores(object):
def runcmd(self, cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
proc.communicate()
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, cmd)
raise RuntimeError("Command '%s' returned non-zero exit status %d: %s" % (cmd, proc.returncode, stdout))
class SecurityConfig(TemplateRenderer):
@ -99,11 +101,10 @@ class SecurityConfig(TemplateRenderer):
KRB5CONF_PATH = "/mnt/security/krb5.conf"
KEYTAB_PATH = "/mnt/security/keytab"
ssl_stores = SslStores()
ssl_stores.generate_ca()
ssl_stores.generate_truststore()
# This is initialized only when the first instance of SecurityConfig is created
ssl_stores = None
def __init__(self, security_protocol=None, interbroker_security_protocol=None,
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, template_props=""):
"""
@ -114,6 +115,15 @@ class SecurityConfig(TemplateRenderer):
template properties either, PLAINTEXT is used as default.
"""
self.context = context
if not SecurityConfig.ssl_stores:
# This generates keystore/trustore files in a local scratch directory which gets
# automatically destroyed after the test is run
# Creating within the scratch directory allows us to run tests in parallel without fear of collision
SecurityConfig.ssl_stores = SslStores(context.local_scratch_dir)
SecurityConfig.ssl_stores.generate_ca()
SecurityConfig.ssl_stores.generate_truststore()
if security_protocol is None:
security_protocol = self.get_property('security.protocol', template_props)
if security_protocol is None:
@ -140,13 +150,12 @@ class SecurityConfig(TemplateRenderer):
'sasl.kerberos.service.name' : 'kafka'
}
def client_config(self, template_props=""):
return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
def setup_ssl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
node.account.copy_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
SecurityConfig.ssl_stores.generate_and_copy_keystore(node)
def setup_sasl(self, node):
@ -162,8 +171,8 @@ class SecurityConfig(TemplateRenderer):
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
if self.has_sasl_kerberos:
node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
node.account.scp_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
def setup_node(self, node):
if self.has_ssl:

View File

@ -16,9 +16,9 @@
import json
import os
import signal
import subprocess
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
@ -243,7 +243,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError) as e:
return []
def try_parse_json(self, string):

View File

@ -16,19 +16,22 @@
import json
import os
import signal
import subprocess
import time
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.utils import is_int, is_int_with_prefix
from kafkatest.version import TRUNK, LATEST_0_8_2
from kafkatest.utils.remote_account import line_count
class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
PERSISTENT_ROOT = "/mnt/verifiable_producer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
@ -38,6 +41,9 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
"verifiable_producer_stdout": {
"path": STDOUT_CAPTURE,
"collect_default": False},
"verifiable_producer_stderr": {
"path": STDERR_CAPTURE,
"collect_default": False},
"verifiable_producer_log": {
"path": LOG_FILE,
"collect_default": True}
@ -114,7 +120,25 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
self.produced_count[idx] = 0
last_produced_time = time.time()
prev_msg = None
for line in node.account.ssh_capture(cmd):
node.account.ssh(cmd)
# Ensure that STDOUT_CAPTURE exists before try to read from it
# Note that if max_messages is configured, it's possible for the process to exit before this
# wait_until condition is checked
start = time.time()
wait_until(lambda: node.account.isfile(VerifiableProducer.STDOUT_CAPTURE) and
line_count(node, VerifiableProducer.STDOUT_CAPTURE) > 0,
timeout_sec=10, err_msg="%s: VerifiableProducer took too long to start" % node.account)
self.logger.debug("%s: VerifiableProducer took %s seconds to start" % (node.account, time.time() - start))
with node.account.open(VerifiableProducer.STDOUT_CAPTURE, 'r') as f:
while True:
line = f.readline()
if line == '' and not self.alive(node):
# The process is gone, and we've reached the end of the output file, so we don't expect
# any more output to appear in the STDOUT_CAPTURE file
break
line = line.strip()
data = self.try_parse_json(line)
@ -146,6 +170,14 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
self.clean_shutdown_nodes.add(node)
def _has_output(self, node):
"""Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output."""
try:
node.account.ssh("test -z \"$(cat %s)\"" % VerifiableProducer.STDOUT_CAPTURE, allow_fail=False)
return False
except RemoteCommandError:
return True
def start_cmd(self, node, idx):
cmd = ""
if node.version <= LATEST_0_8_2:
@ -171,10 +203,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
if self.message_validator == is_int_with_prefix:
cmd += " --value-prefix %s" % str(idx)
if self.acks is not None:
cmd += " --acks %s\n" % str(self.acks)
cmd += " --acks %s " % str(self.acks)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
cmd += " 2>> %s 1>> %s &" % (VerifiableProducer.STDERR_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
return cmd
def kill_node(self, node, clean_shutdown=True, allow_fail=False):
@ -190,7 +222,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):

View File

@ -15,10 +15,11 @@
import re
import subprocess
import time
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.security.security_config import SecurityConfig
@ -46,7 +47,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
@property
def security_config(self):
return SecurityConfig(zk_sasl=self.zk_sasl)
return SecurityConfig(self.context, zk_sasl=self.zk_sasl)
@property
def security_system_properties(self):
@ -85,7 +86,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
except (RemoteCommandError, ValueError) as e:
return []
def alive(self, node):
@ -95,6 +96,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
node.account.kill_process("zookeeper", allow_fail=False)
wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.")
def clean_node(self, node):
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)

View File

@ -15,6 +15,7 @@
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -23,6 +24,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
class CompressionTest(ProduceConsumeValidateTest):
"""
These tests validate produce / consume for compressed topics.
@ -51,6 +53,7 @@ class CompressionTest(ProduceConsumeValidateTest):
# Override this since we're adding services outside of the constructor
return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@cluster(num_nodes=7)
@parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
@parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
def test_compressed_topic(self, compression_types, new_consumer):

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.kafka import TopicPartition
@ -43,6 +44,7 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
@cluster(num_nodes=4)
def rolling_update_test(self):
"""
Verify rolling updates of partition assignment strategies works correctly. In this

View File

@ -15,12 +15,14 @@
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.kafka import TopicPartition
import signal
class OffsetValidationTest(VerifiableConsumerTest):
TOPIC = "test_topic"
NUM_PARTITIONS = 1
@ -72,6 +74,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer
@cluster(num_nodes=7)
def test_broker_rolling_bounce(self):
"""
Verify correct consumer behavior when the brokers are consecutively restarted.
@ -112,6 +115,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.current_position(partition) == consumer.total_consumed(), \
"Total consumed records did not match consumed position"
@cluster(num_nodes=7)
@matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
def test_consumer_bounce(self, clean_shutdown, bounce_mode):
"""
@ -152,6 +156,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.current_position(partition) <= consumer.total_consumed(), \
"Current position greater than the total number of consumed records"
@cluster(num_nodes=7)
@matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
def test_consumer_failure(self, clean_shutdown, enable_autocommit):
partition = TopicPartition(self.TOPIC, 0)
@ -194,7 +199,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.last_commit(partition) == consumer.current_position(partition), \
"Last committed offset did not match last consumed position"
@cluster(num_nodes=7)
@matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
def test_broker_failure(self, clean_shutdown, enable_autocommit):
partition = TopicPartition(self.TOPIC, 0)
@ -229,6 +234,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert consumer.last_commit(partition) == consumer.current_position(partition), \
"Last committed offset did not match last consumed position"
@cluster(num_nodes=7)
def test_group_consumption(self):
"""
Verifies correct group rebalance behavior as consumers are started and stopped.
@ -277,6 +283,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
})
@cluster(num_nodes=6)
@matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor"])
def test_valid_assignment(self, assignment_strategy):

View File

@ -14,6 +14,7 @@
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
@ -56,6 +57,7 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
timeout_sec=120, backoff_sec=1,
err_msg="Producer did not produce all messages in reasonable amount of time"))
@cluster(num_nodes=10)
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
def test_compatibility(self, producer_version, consumer_version):

View File

@ -15,6 +15,7 @@
from ducktape.tests.test import Test
from ducktape.mark import matrix, parametrize
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -124,6 +125,7 @@ class QuotaTest(Test):
"""Override this since we're adding services outside of the constructor"""
return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):

View File

@ -14,18 +14,22 @@
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from ducktape.mark import matrix, parametrize
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import subprocess, itertools, time
import itertools, time
from collections import Counter, namedtuple
import operator
class ConnectDistributedTest(Test):
"""
Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
@ -139,6 +143,7 @@ class ConnectDistributedTest(Test):
status = self._connector_status(connector.name, node)
return self._task_has_state(task_id, status, 'RUNNING')
@cluster(num_nodes=5)
def test_restart_failed_connector(self):
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@ -155,6 +160,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10,
err_msg="Failed to see connector transition to the RUNNING state")
@cluster(num_nodes=5)
@matrix(connector_type=["source", "sink"])
def test_restart_failed_task(self, connector_type):
self.setup_services()
@ -178,7 +184,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10,
err_msg="Failed to see task transition to the RUNNING state")
@cluster(num_nodes=5)
def test_pause_and_resume_source(self):
"""
Verify that source connectors stop producing records when paused and begin again after
@ -217,6 +223,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
err_msg="Failed to produce messages after resuming source connector")
@cluster(num_nodes=5)
def test_pause_and_resume_sink(self):
"""
Verify that sink connectors stop consuming records when paused and begin again after
@ -259,7 +266,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to consume messages after resuming source connector")
@cluster(num_nodes=5)
def test_pause_state_persistent(self):
"""
Verify that paused state is preserved after a cluster restart.
@ -284,7 +291,10 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
err_msg="Failed to see connector startup in PAUSED state")
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
@cluster(num_nodes=5)
@parametrize(security_protocol=SecurityConfig.PLAINTEXT)
@cluster(num_nodes=6)
@parametrize(security_protocol=SecurityConfig.SASL_SSL)
def test_file_source_and_sink(self, security_protocol):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
@ -315,7 +325,7 @@ class ConnectDistributedTest(Test):
node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
@cluster(num_nodes=5)
@matrix(clean=[True, False])
def test_bounce(self, clean):
"""
@ -424,8 +434,6 @@ class ConnectDistributedTest(Test):
assert success, "Found validation errors:\n" + "\n ".join(errors)
def _validate_file_output(self, input):
input_set = set(input)
# Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
@ -437,8 +445,8 @@ class ConnectDistributedTest(Test):
def _file_contents(self, node, file):
try:
# Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
# Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of
# immediately
return list(node.account.ssh_capture("cat " + file))
except subprocess.CalledProcessError:
except RemoteCommandError:
return []

View File

@ -16,7 +16,9 @@
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
from ducktape.utils.util import wait_until
import subprocess
from ducktape.mark.resource import cluster
from ducktape.cluster.remoteaccount import RemoteCommandError
import json
import itertools
@ -57,6 +59,7 @@ class ConnectRestApiTest(KafkaTest):
self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
@cluster(num_nodes=4)
def test_rest_api(self):
# Template parameters
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
@ -171,10 +174,10 @@ class ConnectRestApiTest(KafkaTest):
def file_contents(self, node, file):
try:
# Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
# Convert to a list here or the RemoteCommandError may be returned during a call to the generator instead of
# immediately
return list(node.account.ssh_capture("cat " + file))
except subprocess.CalledProcessError:
except RemoteCommandError:
return []
def _config_dict_from_props(self, connector_props):

View File

@ -14,15 +14,20 @@
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize, matrix
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectStandaloneService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize, matrix
import hashlib, subprocess, json
import hashlib
import json
class ConnectStandaloneFileTest(Test):
"""
@ -58,10 +63,13 @@ class ConnectStandaloneFileTest(Test):
self.zk = ZookeeperService(test_context, self.num_zk)
@cluster(num_nodes=5)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
@parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
@parametrize(security_protocol=SecurityConfig.PLAINTEXT)
@cluster(num_nodes=6)
@parametrize(security_protocol=SecurityConfig.SASL_SSL)
def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
"""
Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes
@ -85,7 +93,6 @@ class ConnectStandaloneFileTest(Test):
self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
consumer_timeout_ms=1000)
self.zk.start()
self.kafka.start()
@ -118,5 +125,5 @@ class ConnectStandaloneFileTest(Test):
try:
output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
return output_hash == hashlib.md5(value).hexdigest()
except subprocess.CalledProcessError:
except RemoteCommandError:
return False

View File

@ -14,6 +14,7 @@
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
@ -43,6 +44,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
self.num_consumers = 1
self.messages_per_producer = 1000
@cluster(num_nodes=6)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)

View File

@ -17,6 +17,7 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -28,6 +29,7 @@ import re
TOPIC = "topic-consumer-group-command"
class ConsumerGroupCommandTest(Test):
"""
Tests ConsumerGroupCommand
@ -89,6 +91,7 @@ class ConsumerGroupCommandTest(Test):
self.consumer.stop()
@cluster(num_nodes=3)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
"""
@ -97,6 +100,7 @@ class ConsumerGroupCommandTest(Test):
"""
self.setup_and_verify(security_protocol)
@cluster(num_nodes=3)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
"""

View File

@ -16,8 +16,9 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from kafkatest.services.verifiable_producer import VerifiableProducer
from ducktape.mark.resource import cluster
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.console_consumer import ConsoleConsumer
@ -28,6 +29,7 @@ MAX_MESSAGES = 100
NUM_PARTITIONS = 1
REPLICATION_FACTOR = 1
class GetOffsetShellTest(Test):
"""
Tests GetOffsetShell tool
@ -44,7 +46,6 @@ class GetOffsetShellTest(Test):
self.zk = ZookeeperService(test_context, self.num_zk)
def setUp(self):
self.zk.start()
@ -69,6 +70,7 @@ class GetOffsetShellTest(Test):
consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
self.consumer.start()
@cluster(num_nodes=4)
def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
"""
Tests if GetOffsetShell is getting offsets correctly

View File

@ -14,7 +14,8 @@
# limitations under the License.
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize, matrix, ignore
from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -110,8 +111,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
@cluster(num_nodes=7)
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
@matrix(security_protocol=['PLAINTEXT', 'SSL'], new_consumer=[True])
@cluster(num_nodes=8)
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
def test_simple_end_to_end(self, security_protocol, new_consumer):
"""
Test end-to-end behavior under non-failure conditions.
@ -140,8 +144,11 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
self.mirror_maker.stop()
@cluster(num_nodes=7)
@matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
@matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
@matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL'])
@cluster(num_nodes=8)
@matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
"""
Test end-to-end behavior under failure conditions.

View File

@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
@ -24,6 +25,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
import random
class ReassignPartitionsTest(ProduceConsumeValidateTest):
"""
These tests validate partition reassignment.
@ -86,6 +88,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
# Wait until finished or timeout
wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
@cluster(num_nodes=7)
@parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
@parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
def test_reassign_partitions(self, bounce_brokers, security_protocol):

View File

@ -16,6 +16,7 @@
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -118,7 +119,7 @@ class ReplicationTest(ProduceConsumeValidateTest):
"""Override this since we're adding services outside of the constructor"""
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader"],
security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])

View File

@ -20,8 +20,8 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils import is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from ducktape.mark import parametrize
from ducktape.mark import matrix
from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster
from kafkatest.services.security.kafka_acls import ACLs
import time
@ -102,7 +102,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
# Bounce again with ACLs for new mechanism
self.set_authorizer_and_bounce(security_protocol, security_protocol)
@matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
@cluster(num_nodes=8)
@matrix(client_protocol=["SSL"])
@cluster(num_nodes=9)
@matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
def test_rolling_upgrade_phase_one(self, client_protocol):
"""
Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
@ -123,6 +126,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.create_producer_and_consumer()
self.run_produce_consume_validate(lambda: time.sleep(1))
@cluster(num_nodes=8)
@matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
"""
@ -143,6 +147,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
#Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
@cluster(num_nodes=9)
@parametrize(new_client_sasl_mechanism='PLAIN')
def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
"""
@ -166,6 +171,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.create_producer_and_consumer()
self.run_produce_consume_validate(lambda: time.sleep(1))
@cluster(num_nodes=8)
@parametrize(new_sasl_mechanism='PLAIN')
def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
"""

View File

@ -13,7 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -23,20 +27,19 @@ from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.security.security_config import SslStores
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
import time
class TestSslStores(SslStores):
def __init__(self):
super(TestSslStores, self).__init__()
self.invalid_hostname = False
def __init__(self, local_scratch_dir, valid_hostname=True):
super(TestSslStores, self).__init__(local_scratch_dir)
self.valid_hostname = valid_hostname
self.generate_ca()
self.generate_truststore()
def hostname(self, node):
if (self.invalid_hostname):
return "invalidhost"
else:
if self.valid_hostname:
return super(TestSslStores, self).hostname(node)
else:
return "invalidhostname"
class SecurityTest(ProduceConsumeValidateTest):
"""
@ -62,6 +65,18 @@ class SecurityTest(ProduceConsumeValidateTest):
def setUp(self):
self.zk.start()
def producer_consumer_have_expected_error(self, error):
try:
for node in self.producer.nodes:
node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
for node in self.consumer.nodes:
node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
except RemoteCommandError:
return False
return True
@cluster(num_nodes=7)
@parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
@ -74,29 +89,35 @@ class SecurityTest(ProduceConsumeValidateTest):
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = interbroker_security_protocol
SecurityConfig.ssl_stores = TestSslStores()
SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False)
SecurityConfig.ssl_stores.invalid_hostname = True
self.kafka.start()
self.create_producer_and_consumer()
self.producer.log_level = "TRACE"
self.producer.start()
self.consumer.start()
time.sleep(10)
assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
for node in self.producer.nodes:
node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
for node in self.consumer.nodes:
node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
try:
wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5)
# Fail quickly if messages are successfully acked
raise RuntimeError("Messages published successfully but should not have!"
" Endpoint validation did not fail with invalid hostname")
except TimeoutError:
# expected
pass
error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5)
self.producer.stop()
self.consumer.stop()
self.producer.log_level = "INFO"
SecurityConfig.ssl_stores.invalid_hostname = False
SecurityConfig.ssl_stores.valid_hostname = True
for node in self.kafka.nodes:
self.kafka.restart_node(node, clean_shutdown=True)
self.create_producer_and_consumer()
self.run_produce_consume_validate()

View File

@ -16,6 +16,8 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
from kafkatest.services.verifiable_producer import VerifiableProducer
@ -26,6 +28,7 @@ MAX_MESSAGES = 100
NUM_PARTITIONS = 1
REPLICATION_FACTOR = 1
class SimpleConsumerShellTest(Test):
"""
Tests SimpleConsumerShell tool
@ -61,6 +64,7 @@ class SimpleConsumerShellTest(Test):
self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
self.simple_consumer_shell.start()
@cluster(num_nodes=4)
def test_simple_consumer_shell(self):
"""
Tests if SimpleConsumerShell is fetching expected records

View File

@ -16,6 +16,7 @@
import time
import math
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.performance import ProducerPerformanceService
@ -137,6 +138,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
estimated_throttled_time,
time_taken))
@cluster(num_nodes=10)
@parametrize(bounce_brokers=False)
@parametrize(bounce_brokers=True)
def test_throttled_reassignment(self, bounce_brokers):

View File

@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
import json
@ -60,10 +61,13 @@ class TestUpgrade(ProduceConsumeValidateTest):
node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
self.kafka.start_node(node)
@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
@cluster(num_nodes=7)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
@ -71,6 +75,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
@cluster(num_nodes=7)
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,

View File

@ -14,6 +14,7 @@
# limitations under the License.
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -92,7 +93,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
self.kafka.stop_node(node)
self.kafka.start_node(node)
@matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
@cluster(num_nodes=9)
@matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"])
def test_zk_security_upgrade(self, security_protocol):
self.zk.start()
self.kafka.security_protocol = security_protocol
@ -103,7 +105,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
if(self.no_sasl):
if self.no_sasl:
self.kafka.start()
else:
self.kafka.start(self.zk.zk_principals)

View File

@ -13,12 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import ignore
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
import time
class StreamsBounceTest(KafkaTest):
"""
Simple test of Kafka Streams.
@ -41,6 +42,7 @@ class StreamsBounceTest(KafkaTest):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
@cluster(num_nodes=5)
def test_bounce(self):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
@ -51,11 +53,11 @@ class StreamsBounceTest(KafkaTest):
self.processor1.start()
time.sleep(15);
time.sleep(15)
self.processor1.abortThenRestart()
time.sleep(15);
time.sleep(15)
# enable this after we add change log partition replicas
#self.kafka.signal_leader("data")

View File

@ -13,12 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import ignore
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
import time
class StreamsSmokeTest(KafkaTest):
"""
Simple test of Kafka Streams.
@ -45,6 +47,7 @@ class StreamsSmokeTest(KafkaTest):
self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
@ignore
@cluster(num_nodes=7)
def test_streams(self):
"""
Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
@ -56,14 +59,14 @@ class StreamsSmokeTest(KafkaTest):
self.processor1.start()
self.processor2.start()
time.sleep(15);
time.sleep(15)
self.processor3.start()
self.processor1.stop()
time.sleep(15);
time.sleep(15)
self.processor4.start();
self.processor4.start()
self.driver.wait()
self.driver.stop()

View File

@ -17,6 +17,7 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
@ -27,6 +28,7 @@ from kafkatest.services.security.security_config import SecurityConfig
TOPIC = "topic-log4j-appender"
MAX_MESSAGES = 100
class Log4jAppenderTest(Test):
"""
Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic
@ -62,7 +64,6 @@ class Log4jAppenderTest(Test):
self.logger.debug("Received message: %s" % msg)
self.messages_received_count += 1
def start_consumer(self, security_protocol):
enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
@ -70,7 +71,10 @@ class Log4jAppenderTest(Test):
message_validator=self.custom_message_validator)
self.consumer.start()
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
@cluster(num_nodes=4)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
@cluster(num_nodes=5)
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
def test_log4j_appender(self, security_protocol='PLAINTEXT'):
"""
Tests if KafkaLog4jAppender is producing to Kafka topic

View File

@ -16,8 +16,9 @@
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from kafkatest.services.verifiable_producer import VerifiableProducer
from ducktape.mark.resource import cluster
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.replica_verification_tool import ReplicaVerificationTool
@ -59,9 +60,8 @@ class ReplicaVerificationToolTest(Test):
def start_producer(self, max_messages, acks, timeout):
# This will produce to kafka cluster
current_acked = 0
self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages)
current_acked = self.producer.num_acked
self.logger.info("current_acked = %s" % current_acked)
self.producer.start()
wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout,
err_msg="Timeout awaiting messages to be produced and acked")
@ -69,6 +69,7 @@ class ReplicaVerificationToolTest(Test):
def stop_producer(self):
self.producer.stop()
@cluster(num_nodes=6)
def test_replica_lags(self, security_protocol='PLAINTEXT'):
"""
Tests ReplicaVerificationTool
@ -77,6 +78,7 @@ class ReplicaVerificationToolTest(Test):
self.start_kafka(security_protocol, security_protocol)
self.start_replica_verification_tool(security_protocol)
self.start_producer(max_messages=10, acks=-1, timeout=15)
# Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool
wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10,
err_msg="Timed out waiting to reach zero replica lags.")

View File

@ -50,7 +50,7 @@ setup(name="kafkatest",
license="apache2.0",
packages=find_packages(),
include_package_data=True,
install_requires=["ducktape==0.5.3", "requests>=2.5.0"],
install_requires=["ducktape==0.6.0", "requests>=2.5.0"],
tests_require=["pytest", "mock"],
cmdclass={'test': PyTest},
)