mirror of https://github.com/apache/kafka.git
KAFKA-10402: Upgrade system tests to python3 (#9196)
For now, Kafka system tests use python2 which is outdated and not supported. This PR upgrades python to the third version. Reviewers: Ivan Daschinskiy, Mickael Maison <mickael.maison@gmail.com>, Magnus Edenhill <magnus@edenhill.se>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
40a23cc0c2
commit
4e65030e05
|
@ -33,9 +33,9 @@ LABEL ducker.creator=$ducker_creator
|
|||
|
||||
# Update Linux and install necessary utilities.
|
||||
# we have to install git since it is included in openjdk:8 but not openjdk:11
|
||||
RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
|
||||
RUN python -m pip install -U pip==9.0.3;
|
||||
RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9
|
||||
RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
|
||||
RUN python3 -m pip install -U pip==20.2.2;
|
||||
RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip3 install --upgrade ducktape==0.8.0
|
||||
|
||||
# Set up ssh
|
||||
COPY ./ssh-config /root/.ssh/config
|
||||
|
|
|
@ -270,7 +270,7 @@ setup_custom_ducktape() {
|
|||
docker_run ducker01 "${image_name}"
|
||||
local running_container="$(docker ps -f=network=ducknet -q)"
|
||||
must_do -v -o docker cp "${custom_ducktape}" "${running_container}:/opt/ducktape"
|
||||
docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python ./setup.py develop install && cd /opt/ducktape && sudo python ./setup.py develop install'
|
||||
docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python3 ./setup.py develop install && cd /opt/ducktape && sudo python3 ./setup.py develop install'
|
||||
[[ $? -ne 0 ]] && die "failed to install the new ducktape."
|
||||
must_do -v -o docker commit ducker01 "${image_name}"
|
||||
must_do -v docker kill "${running_container}"
|
||||
|
|
|
@ -140,8 +140,8 @@ class Benchmark(Test):
|
|||
# FIXME we should be generating a graph too
|
||||
# Try to break it into 5 blocks, but fall back to a smaller number if
|
||||
# there aren't even 5 elements
|
||||
block_size = max(len(self.producer.stats[0]) / 5, 1)
|
||||
nblocks = len(self.producer.stats[0]) / block_size
|
||||
block_size = max(len(self.producer.stats[0]) // 5, 1)
|
||||
nblocks = len(self.producer.stats[0]) // block_size
|
||||
|
||||
for i in range(nblocks):
|
||||
subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
import time
|
||||
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark import parametrize
|
||||
from ducktape.mark.resource import cluster
|
||||
from ducktape.tests.test import Test
|
||||
from ducktape.utils.util import wait_until
|
||||
|
|
|
@ -39,7 +39,7 @@ class TestVerifiableProducer(Test):
|
|||
self.num_messages = 1000
|
||||
# This will produce to source kafka cluster
|
||||
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
|
||||
max_messages=self.num_messages, throughput=self.num_messages/5)
|
||||
max_messages=self.num_messages, throughput=self.num_messages // 5)
|
||||
|
||||
def setUp(self):
|
||||
self.zk.start()
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import os
|
||||
|
||||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
|
|
|
@ -13,6 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from kafka import KafkaService
|
||||
from util import TopicPartition
|
||||
from config import KafkaConfig
|
||||
from .kafka import KafkaService
|
||||
from .util import TopicPartition
|
||||
from .config import KafkaConfig
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import config_property
|
||||
from . import config_property
|
||||
|
||||
|
||||
class KafkaConfig(dict):
|
||||
|
@ -34,7 +34,7 @@ class KafkaConfig(dict):
|
|||
|
||||
# Set defaults
|
||||
for key, val in self.DEFAULTS.items():
|
||||
if not self.has_key(key):
|
||||
if key not in self:
|
||||
self[key] = val
|
||||
|
||||
def render(self):
|
||||
|
|
|
@ -23,7 +23,7 @@ from ducktape.services.service import Service
|
|||
from ducktape.utils.util import wait_until
|
||||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
|
||||
from config import KafkaConfig
|
||||
from .config import KafkaConfig
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.kafka import config_property
|
||||
from kafkatest.services.monitor.jmx import JmxMixin
|
||||
|
@ -678,11 +678,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
fields = line.split("\t")
|
||||
# ["Partition: 4", "Leader: 0"] -> ["4", "0"]
|
||||
fields = map(lambda x: x.split(" ")[1], fields)
|
||||
fields = list(map(lambda x: x.split(" ")[1], fields))
|
||||
partitions.append(
|
||||
{"topic": fields[0],
|
||||
"partition": int(fields[1]),
|
||||
"replicas": map(int, fields[3].split(','))})
|
||||
"replicas": list(map(int, fields[3].split(',')))})
|
||||
return {"partitions": partitions}
|
||||
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ inter.broker.listener.name={{ interbroker_listener.name }}
|
|||
security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
|
||||
{% endif %}
|
||||
|
||||
{% for k, v in listener_security_config.client_listener_overrides.iteritems() %}
|
||||
{% for k, v in listener_security_config.client_listener_overrides.items() %}
|
||||
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
|
||||
listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }}
|
||||
{% else %}
|
||||
|
@ -36,7 +36,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
|
|||
{% endfor %}
|
||||
|
||||
{% if interbroker_listener.name != security_protocol %}
|
||||
{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %}
|
||||
{% for k, v in listener_security_config.interbroker_listener_overrides.items() %}
|
||||
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
|
||||
listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }}
|
||||
{% else %}
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from collections import defaultdict, namedtuple
|
||||
import json
|
||||
from threading import Thread
|
||||
|
@ -114,7 +114,7 @@ class HttpMetricsCollector(object):
|
|||
Get any collected metrics that match the specified parameters, yielding each as a tuple of
|
||||
(key, [<timestamp, value>, ...]) values.
|
||||
"""
|
||||
for k, values in self._http_metrics.iteritems():
|
||||
for k, values in self._http_metrics.items():
|
||||
if ((host is None or host == k.host) and
|
||||
(client_id is None or client_id == k.client_id) and
|
||||
(name is None or name == k.name) and
|
||||
|
@ -154,7 +154,7 @@ class _MetricsReceiver(BaseHTTPRequestHandler):
|
|||
name = raw_metric['name']
|
||||
group = raw_metric['group']
|
||||
# Convert to tuple of pairs because dicts & lists are unhashable
|
||||
tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]),
|
||||
tags = tuple((k, v) for k, v in raw_metric['tags'].items()),
|
||||
value = raw_metric['value']
|
||||
|
||||
key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags)
|
||||
|
|
|
@ -128,7 +128,7 @@ class JmxMixin(object):
|
|||
|
||||
for name in object_attribute_names:
|
||||
aggregates_per_time = []
|
||||
for time_sec in xrange(start_time_sec, end_time_sec + 1):
|
||||
for time_sec in range(start_time_sec, end_time_sec + 1):
|
||||
# assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
|
||||
values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
|
||||
# assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from performance import PerformanceService, throughput, latency, compute_aggregate_throughput
|
||||
from end_to_end_latency import EndToEndLatencyService
|
||||
from producer_performance import ProducerPerformanceService
|
||||
from consumer_performance import ConsumerPerformanceService
|
||||
from .performance import PerformanceService, throughput, latency, compute_aggregate_throughput
|
||||
from .end_to_end_latency import EndToEndLatencyService
|
||||
from .producer_performance import ProducerPerformanceService
|
||||
from .consumer_performance import ConsumerPerformanceService
|
||||
|
|
|
@ -78,7 +78,7 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
|
|||
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
|
||||
'client_id': self.client_id,
|
||||
'kafka_run_class': self.path.script("kafka-run-class.sh", node),
|
||||
'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()])
|
||||
'metrics_props': ' '.join("%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.items())
|
||||
})
|
||||
|
||||
cmd = ""
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import random
|
||||
import uuid
|
||||
from io import open
|
||||
from os import remove, close
|
||||
from shutil import move
|
||||
|
|
|
@ -386,7 +386,7 @@ class SecurityConfig(TemplateRenderer):
|
|||
return ""
|
||||
if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties:
|
||||
raise Exception("JAAS configuration property has not yet been initialized")
|
||||
config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
|
||||
config_lines = (prefix + key + "=" + value for key, value in self.properties.items())
|
||||
# Extra blank lines ensure this can be appended/prepended safely
|
||||
return "\n".join(itertools.chain([""], config_lines, [""]))
|
||||
|
||||
|
|
|
@ -15,9 +15,8 @@
|
|||
|
||||
import os.path
|
||||
import signal
|
||||
import streams_property
|
||||
import consumer_property
|
||||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
from . import streams_property
|
||||
from . import consumer_property
|
||||
from ducktape.services.service import Service
|
||||
from ducktape.utils.util import wait_until
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
|
@ -231,7 +230,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
try:
|
||||
pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)]
|
||||
return [int(pid) for pid in pids]
|
||||
except Exception, exception:
|
||||
except Exception as exception:
|
||||
self.logger.debug(str(exception))
|
||||
return []
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
|
||||
|
||||
{% if loggers is defined %}
|
||||
{% for logger, log_level in loggers.iteritems() %}
|
||||
{% for logger, log_level in loggers.items() %}
|
||||
log4j.logger.{{ logger }}={{ log_level }}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
|
|
@ -44,7 +44,7 @@ class TaskSpec(object):
|
|||
"""
|
||||
node_names = []
|
||||
for obj in nodes:
|
||||
if isinstance(obj, basestring):
|
||||
if isinstance(obj, str):
|
||||
node_names.append(obj)
|
||||
else:
|
||||
node_names.append(obj.name)
|
||||
|
|
|
@ -18,7 +18,6 @@ from kafkatest.version import DEV_BRANCH, LATEST_0_8_2
|
|||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
|
||||
import importlib
|
||||
import os
|
||||
import subprocess
|
||||
import signal
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
|
|
@ -361,7 +361,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
|||
|
||||
def current_assignment(self):
|
||||
with self.lock:
|
||||
return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() }
|
||||
return { handler.node: handler.current_assignment() for handler in self.event_handlers.values() }
|
||||
|
||||
def current_position(self, tp):
|
||||
with self.lock:
|
||||
|
@ -372,7 +372,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
|||
|
||||
def owner(self, tp):
|
||||
with self.lock:
|
||||
for handler in self.event_handlers.itervalues():
|
||||
for handler in self.event_handlers.values():
|
||||
if tp in handler.current_assignment():
|
||||
return handler.node
|
||||
return None
|
||||
|
@ -386,33 +386,33 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
|||
|
||||
def total_consumed(self):
|
||||
with self.lock:
|
||||
return sum(handler.total_consumed for handler in self.event_handlers.itervalues())
|
||||
return sum(handler.total_consumed for handler in self.event_handlers.values())
|
||||
|
||||
def num_rebalances(self):
|
||||
with self.lock:
|
||||
return max(handler.assigned_count for handler in self.event_handlers.itervalues())
|
||||
return max(handler.assigned_count for handler in self.event_handlers.values())
|
||||
|
||||
def num_revokes_for_alive(self, keep_alive=1):
|
||||
with self.lock:
|
||||
return max([handler.revoked_count for handler in self.event_handlers.itervalues()
|
||||
if handler.idx <= keep_alive])
|
||||
return max(handler.revoked_count for handler in self.event_handlers.values()
|
||||
if handler.idx <= keep_alive)
|
||||
|
||||
def joined_nodes(self):
|
||||
with self.lock:
|
||||
return [handler.node for handler in self.event_handlers.itervalues()
|
||||
return [handler.node for handler in self.event_handlers.values()
|
||||
if handler.state == ConsumerState.Joined]
|
||||
|
||||
def rebalancing_nodes(self):
|
||||
with self.lock:
|
||||
return [handler.node for handler in self.event_handlers.itervalues()
|
||||
return [handler.node for handler in self.event_handlers.values()
|
||||
if handler.state == ConsumerState.Rebalancing]
|
||||
|
||||
def dead_nodes(self):
|
||||
with self.lock:
|
||||
return [handler.node for handler in self.event_handlers.itervalues()
|
||||
return [handler.node for handler in self.event_handlers.values()
|
||||
if handler.state == ConsumerState.Dead]
|
||||
|
||||
def alive_nodes(self):
|
||||
with self.lock:
|
||||
return [handler.node for handler in self.event_handlers.itervalues()
|
||||
return [handler.node for handler in self.event_handlers.values()
|
||||
if handler.state != ConsumerState.Dead]
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
from ducktape.services.service import Service
|
||||
from ducktape.utils.util import wait_until
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
# limitations under the License.
|
||||
|
||||
import os
|
||||
|
||||
import errno
|
||||
import time
|
||||
from random import randint
|
||||
|
||||
|
@ -53,7 +55,7 @@ def run_command(node, cmd, ssh_log_file):
|
|||
f.write(line)
|
||||
except Exception as e:
|
||||
f.write("** Command failed!")
|
||||
print e
|
||||
print(e)
|
||||
raise
|
||||
|
||||
|
||||
|
@ -86,14 +88,14 @@ class ClientCompatibilityFeaturesTest(Test):
|
|||
"--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
|
||||
self.kafka.bootstrap_servers(),
|
||||
len(self.kafka.nodes),
|
||||
self.topics.keys()[0]))
|
||||
for k, v in features.iteritems():
|
||||
list(self.topics.keys())[0]))
|
||||
for k, v in features.items():
|
||||
cmd = cmd + ("--%s %s " % (k, v))
|
||||
results_dir = TestContext.results_dir(self.test_context, 0)
|
||||
try:
|
||||
os.makedirs(results_dir)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EEXIST and os.path.isdir(path):
|
||||
if e.errno == errno.EEXIST and os.path.isdir(results_dir):
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
|
|
@ -162,7 +162,7 @@ class QuotaTest(Test):
|
|||
jmx_attributes=['bytes-consumed-rate'], version=client_version)
|
||||
consumer.run()
|
||||
|
||||
for idx, messages in consumer.messages_consumed.iteritems():
|
||||
for idx, messages in consumer.messages_consumed.items():
|
||||
assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
|
||||
|
||||
success, msg = self.validate(self.kafka, producer, consumer)
|
||||
|
|
|
@ -26,6 +26,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer
|
|||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, LATEST_2_0, LATEST_1_1, LATEST_1_0, LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
|
||||
|
||||
from functools import reduce
|
||||
from collections import Counter, namedtuple
|
||||
import itertools
|
||||
import json
|
||||
|
@ -227,9 +228,9 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector transition to the PAUSED state")
|
||||
|
||||
# verify that we do not produce new messages while paused
|
||||
num_messages = len(self.source.sent_messages())
|
||||
num_messages = len(list(self.source.sent_messages()))
|
||||
time.sleep(10)
|
||||
assert num_messages == len(self.source.sent_messages()), "Paused source connector should not produce any messages"
|
||||
assert num_messages == len(list(self.source.sent_messages())), "Paused source connector should not produce any messages"
|
||||
|
||||
self.cc.resume_connector(self.source.name)
|
||||
|
||||
|
@ -238,7 +239,7 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector transition to the RUNNING state")
|
||||
|
||||
# after resuming, we should see records produced again
|
||||
wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30,
|
||||
wait_until(lambda: len(list(self.source.sent_messages())) > num_messages, timeout_sec=30,
|
||||
err_msg="Failed to produce messages after resuming source connector")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
|
@ -258,7 +259,7 @@ class ConnectDistributedTest(Test):
|
|||
self.source = VerifiableSource(self.cc, topic=self.TOPIC)
|
||||
self.source.start()
|
||||
|
||||
wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30,
|
||||
wait_until(lambda: len(list(self.source.committed_messages())) > 0, timeout_sec=30,
|
||||
err_msg="Timeout expired waiting for source task to produce a message")
|
||||
|
||||
self.sink = VerifiableSink(self.cc, topics=[self.TOPIC])
|
||||
|
@ -275,9 +276,9 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector transition to the PAUSED state")
|
||||
|
||||
# verify that we do not consume new messages while paused
|
||||
num_messages = len(self.sink.received_messages())
|
||||
num_messages = len(list(self.sink.received_messages()))
|
||||
time.sleep(10)
|
||||
assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages"
|
||||
assert num_messages == len(list(self.sink.received_messages())), "Paused sink connector should not consume any messages"
|
||||
|
||||
self.cc.resume_connector(self.sink.name)
|
||||
|
||||
|
@ -286,7 +287,7 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector transition to the RUNNING state")
|
||||
|
||||
# after resuming, we should see records consumed again
|
||||
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
|
||||
wait_until(lambda: len(list(self.sink.received_messages())) > num_messages, timeout_sec=30,
|
||||
err_msg="Failed to consume messages after resuming sink connector")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
|
@ -420,11 +421,11 @@ class ConnectDistributedTest(Test):
|
|||
src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task]
|
||||
# Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
|
||||
# bouncing should commit on rebalance.
|
||||
src_seqno_max = max(src_seqnos)
|
||||
src_seqno_max = max(src_seqnos) if len(src_seqnos) else 0
|
||||
self.logger.debug("Max source seqno: %d", src_seqno_max)
|
||||
src_seqno_counts = Counter(src_seqnos)
|
||||
missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos)))
|
||||
duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1])
|
||||
duplicate_src_seqnos = sorted(seqno for seqno,count in src_seqno_counts.items() if count > 1)
|
||||
|
||||
if missing_src_seqnos:
|
||||
self.logger.error("Missing source sequence numbers for task " + str(task))
|
||||
|
@ -440,11 +441,11 @@ class ConnectDistributedTest(Test):
|
|||
sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task]
|
||||
# Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because
|
||||
# clean bouncing should commit on rebalance.
|
||||
sink_seqno_max = max(sink_seqnos)
|
||||
sink_seqno_max = max(sink_seqnos) if len(sink_seqnos) else 0
|
||||
self.logger.debug("Max sink seqno: %d", sink_seqno_max)
|
||||
sink_seqno_counts = Counter(sink_seqnos)
|
||||
missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos)))
|
||||
duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1])
|
||||
duplicate_sink_seqnos = sorted(seqno for seqno,count in iter(sink_seqno_counts.items()) if count > 1)
|
||||
|
||||
if missing_sink_seqnos:
|
||||
self.logger.error("Missing sink sequence numbers for task " + str(task))
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
from ducktape.tests.test import Test
|
||||
from ducktape.mark.resource import cluster
|
||||
from ducktape.utils.util import wait_until
|
||||
from ducktape.mark import parametrize, matrix
|
||||
from ducktape.mark import parametrize
|
||||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
from ducktape.errors import TimeoutError
|
||||
|
||||
|
@ -28,7 +28,6 @@ from kafkatest.services.security.security_config import SecurityConfig
|
|||
|
||||
import hashlib
|
||||
import json
|
||||
import os.path
|
||||
|
||||
|
||||
class ConnectStandaloneFileTest(Test):
|
||||
|
@ -130,7 +129,7 @@ class ConnectStandaloneFileTest(Test):
|
|||
def validate_output(self, value):
|
||||
try:
|
||||
output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
|
||||
return output_hash == hashlib.md5(value).hexdigest()
|
||||
return output_hash == hashlib.md5(value.encode('utf-8')).hexdigest()
|
||||
except RemoteCommandError:
|
||||
return False
|
||||
|
||||
|
|
|
@ -17,14 +17,9 @@ from ducktape.mark import parametrize, matrix
|
|||
from ducktape.mark.resource import cluster
|
||||
from ducktape.utils.util import wait_until
|
||||
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.kafka import config_property
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.end_to_end import EndToEndTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
|
||||
from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, KafkaVersion
|
||||
|
||||
class TestDowngrade(EndToEndTest):
|
||||
PARTITIONS = 3
|
||||
|
|
|
@ -22,7 +22,6 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
|
||||
TOPIC = "topic-get-offset-shell"
|
||||
MAX_MESSAGES = 100
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from ducktape.utils.util import wait_until
|
||||
from ducktape.mark import parametrize, matrix
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
|
|
|
@ -129,7 +129,7 @@ class NetworkDegradeTest(Test):
|
|||
self.logger.info("Measured rates: %s" % measured_rates)
|
||||
|
||||
# We expect to see measured rates within an order of magnitude of our target rate
|
||||
low_kbps = rate_limit_kbit / 10
|
||||
low_kbps = rate_limit_kbit // 10
|
||||
high_kbps = rate_limit_kbit * 10
|
||||
acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps]
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
|
|||
self.logger.debug("Jumble partition assignment with seed " + str(seed))
|
||||
random.seed(seed)
|
||||
# The list may still be in order, but that's ok
|
||||
shuffled_list = range(0, self.num_partitions)
|
||||
shuffled_list = list(range(0, self.num_partitions))
|
||||
random.shuffle(shuffled_list)
|
||||
|
||||
for i in range(0, self.num_partitions):
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
from ducktape.mark.resource import cluster
|
||||
from ducktape.mark import parametrize
|
||||
from ducktape.tests.test import Test
|
||||
from ducktape.utils.util import wait_until
|
||||
|
||||
from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
|
||||
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
|
||||
|
@ -25,7 +24,6 @@ from kafkatest.services.kafka import KafkaService
|
|||
from kafkatest.services.trogdor.trogdor import TrogdorService
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
|
||||
|
@ -48,7 +46,7 @@ class ReplicaScaleTest(Test):
|
|||
self.zk.stop()
|
||||
|
||||
@cluster(num_nodes=12)
|
||||
@parametrize(topic_count=500, partition_count=34, replication_factor=3)
|
||||
@parametrize(topic_count=50, partition_count=34, replication_factor=3)
|
||||
def test_produce_consume(self, topic_count, partition_count, replication_factor):
|
||||
topics_create_start_time = time.time()
|
||||
for i in range(topic_count):
|
||||
|
@ -103,7 +101,7 @@ class ReplicaScaleTest(Test):
|
|||
trogdor.stop()
|
||||
|
||||
@cluster(num_nodes=12)
|
||||
@parametrize(topic_count=500, partition_count=34, replication_factor=3)
|
||||
@parametrize(topic_count=50, partition_count=34, replication_factor=3)
|
||||
def test_clean_bounce(self, topic_count, partition_count, replication_factor):
|
||||
topics_create_start_time = time.time()
|
||||
for i in range(topic_count):
|
||||
|
|
|
@ -73,7 +73,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
|
|||
self.num_records = 2000
|
||||
self.record_size = 4096 * 100 # 400 KB
|
||||
# 1 MB per partition on average.
|
||||
self.partition_size = (self.num_records * self.record_size) / self.num_partitions
|
||||
self.partition_size = (self.num_records * self.record_size) // self.num_partitions
|
||||
self.num_producers = 2
|
||||
self.num_consumers = 1
|
||||
self.throttle = 4 * 1024 * 1024 # 4 MB/s
|
||||
|
|
|
@ -248,7 +248,7 @@ class TransactionsTest(Test):
|
|||
# We reduce the number of seed messages to copy to account for the fewer output
|
||||
# partitions, and thus lower parallelism. This helps keep the test
|
||||
# time shorter.
|
||||
self.num_seed_messages = self.num_seed_messages / 3
|
||||
self.num_seed_messages = self.num_seed_messages // 3
|
||||
self.num_input_partitions = 1
|
||||
self.num_output_partitions = 1
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark import matrix, ignore
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark import matrix, ignore
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
|
@ -23,8 +22,6 @@ from kafkatest.services.console_consumer import ConsoleConsumer
|
|||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
|
||||
import logging
|
||||
|
||||
class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest):
|
||||
"""Tests TLS encryption-only (ssl.clientAuth=none) connectivity to zookeeper.
|
||||
"""
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark import matrix, ignore
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
|
@ -23,8 +22,6 @@ from kafkatest.services.console_consumer import ConsoleConsumer
|
|||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
|
||||
import logging
|
||||
|
||||
class ZookeeperTlsTest(ProduceConsumeValidateTest):
|
||||
"""Tests TLS connectivity to zookeeper.
|
||||
"""
|
||||
|
|
|
@ -23,8 +23,6 @@ from kafkatest.services.verifiable_consumer import VerifiableConsumer
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.utils import validate_delivery
|
||||
|
||||
import time
|
||||
|
||||
class EndToEndTest(Test):
|
||||
"""This class provides a shared template for tests which follow the common pattern of:
|
||||
|
||||
|
@ -87,7 +85,7 @@ class EndToEndTest(Test):
|
|||
|
||||
def await_consumed_offsets(self, last_acked_offsets, timeout_sec):
|
||||
def has_finished_consuming():
|
||||
for partition, offset in last_acked_offsets.iteritems():
|
||||
for partition, offset in last_acked_offsets.items():
|
||||
if not partition in self.last_consumed_offsets:
|
||||
return False
|
||||
last_commit = self.consumer.last_commit(partition)
|
||||
|
|
|
@ -17,7 +17,7 @@ from ducktape.utils.util import wait_until
|
|||
from ducktape.tests.test import Test
|
||||
from ducktape.mark.resource import cluster
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark import parametrize, ignore
|
||||
from ducktape.mark import ignore
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
|
||||
|
@ -119,7 +119,7 @@ class StreamsBrokerBounceTest(Test):
|
|||
def fail_broker_type(self, failure_mode, broker_type):
|
||||
# Pick a random topic and bounce it's leader
|
||||
topic_index = randint(0, len(self.topics.keys()) - 1)
|
||||
topic = self.topics.keys()[topic_index]
|
||||
topic = list(self.topics.keys())[topic_index]
|
||||
failures[failure_mode](self, topic, broker_type)
|
||||
|
||||
def fail_many_brokers(self, failure_mode, num_failures):
|
||||
|
|
|
@ -20,8 +20,7 @@ from kafkatest.services.kafka import KafkaService
|
|||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
|
||||
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
|
||||
DEV_BRANCH, DEV_VERSION, KafkaVersion
|
||||
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3
|
||||
from kafkatest.services.streams import CooperativeRebalanceUpgradeService
|
||||
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
|
||||
|
||||
{% if loggers is defined %}
|
||||
{% for logger, log_level in loggers.iteritems() %}
|
||||
{% for logger, log_level in loggers.items() %}
|
||||
log4j.logger.{{ logger }}={{ log_level }}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
|
|
@ -13,4 +13,4 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id
|
||||
from .util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id
|
||||
|
|
|
@ -23,7 +23,6 @@ from kafkatest.services.zookeeper import ZookeeperService
|
|||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
|
||||
TOPIC = "topic-log4j-appender"
|
||||
MAX_MESSAGES = 100
|
||||
|
|
|
@ -40,7 +40,7 @@ class VerifiableConsumerTest(KafkaTest):
|
|||
|
||||
def _partitions(self, assignment):
|
||||
partitions = []
|
||||
for parts in assignment.itervalues():
|
||||
for parts in assignment.values():
|
||||
partitions += parts
|
||||
return partitions
|
||||
|
||||
|
|
|
@ -13,4 +13,4 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery
|
||||
from .util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery
|
||||
|
|
|
@ -14,9 +14,7 @@
|
|||
|
||||
from kafkatest import __version__ as __kafkatest_version__
|
||||
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
|
||||
|
||||
def kafkatest_version():
|
||||
|
|
|
@ -49,6 +49,19 @@ class KafkaVersion(LooseVersion):
|
|||
else:
|
||||
return LooseVersion.__str__(self)
|
||||
|
||||
def _cmp(self, other):
|
||||
if isinstance(other, str):
|
||||
other = KafkaVersion(other)
|
||||
|
||||
if other.is_dev:
|
||||
if self.is_dev:
|
||||
return 0
|
||||
return -1
|
||||
elif self.is_dev:
|
||||
return 1
|
||||
|
||||
return LooseVersion._cmp(self, other)
|
||||
|
||||
def supports_named_listeners(self):
|
||||
return self >= V_0_10_2_0
|
||||
|
||||
|
|
Loading…
Reference in New Issue