KAFKA-17923 Remove old kafka version from e2e (#17673)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-11-11 06:22:06 +08:00 committed by GitHub
parent 9c0fe85605
commit 440e0b8801
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 60 additions and 353 deletions

View File

@ -77,15 +77,6 @@ RUN echo 'PermitUserEnvironment yes' >> /etc/ssh/sshd_config
# Install binary test dependencies. # Install binary test dependencies.
# we use the same versions as in vagrant/base.sh # we use the same versions as in vagrant/base.sh
ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2"
RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3"
RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2"
RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1"
RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1"
RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1" RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1"
RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2"
RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1"
@ -106,13 +97,6 @@ RUN mkdir -p "/opt/kafka-3.8.1" && chmod a+rw /opt/kafka-3.8.1 && curl -s "$KAFK
# Streams test dependencies # Streams test dependencies
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar

View File

@ -16,7 +16,7 @@
import importlib import importlib
import os import os
from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH, LATEST_0_9, LATEST_3_5 from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH, LATEST_3_5
"""This module serves a few purposes: """This module serves a few purposes:
@ -55,11 +55,6 @@ JARS = {
# This version of the file connectors does not contain ServiceLoader manifests # This version of the file connectors does not contain ServiceLoader manifests
LATEST_3_5.__str__(): { LATEST_3_5.__str__(): {
CONNECT_FILE_JAR: "libs/connect-file*.jar" CONNECT_FILE_JAR: "libs/connect-file*.jar"
},
# TODO: This is only used in 0.8.2.x system tests, remove with KAFKA-14762
LATEST_0_9.__str__(): {
TOOLS_JAR_NAME: "libs/kafka-tools*.jar",
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/{argparse4j,jackson}*.jar"
} }
} }

View File

@ -22,10 +22,8 @@ from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils.remote_account import line_count, file_exists from kafkatest.utils.remote_account import line_count, file_exists
from kafkatest.version import LATEST_0_8_2
class ConsoleConsumerTest(Test): class ConsoleConsumerTest(Test):
@ -77,24 +75,3 @@ class ConsoleConsumerTest(Test):
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node) self.consumer.stop_node(node)
@cluster(num_nodes=4)
def test_version(self):
"""Check that console consumer v0.8.2.X successfully starts and consumes messages."""
self.kafka.start()
num_messages = 1000
self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=num_messages, throughput=1000)
self.producer.start()
self.producer.wait()
self.consumer.nodes[0].version = LATEST_0_8_2
self.consumer.new_consumer = False
self.consumer.consumer_timeout_ms = 1000
self.consumer.start()
self.consumer.wait()
num_consumed = len(self.consumer.messages_consumed[1])
num_produced = self.producer.num_acked
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)

View File

@ -21,7 +21,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
from kafkatest.services.performance import latency, compute_aggregate_throughput from kafkatest.services.performance import latency, compute_aggregate_throughput
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_1_1, KafkaVersion from kafkatest.version import DEV_BRANCH, LATEST_2_1, KafkaVersion
class PerformanceServiceTest(Test): class PerformanceServiceTest(Test):
@ -38,15 +38,8 @@ class PerformanceServiceTest(Test):
self.zk.start() self.zk.start()
@cluster(num_nodes=5) @cluster(num_nodes=5)
# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, @matrix(version=[str(LATEST_2_1), str(DEV_BRANCH)], metadata_quorum=quorum.all)
# the overhead should be manageable. def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk):
@parametrize(version=str(LATEST_0_8_2), new_consumer=False)
@parametrize(version=str(LATEST_0_9), new_consumer=False)
@parametrize(version=str(LATEST_0_9))
@parametrize(version=str(LATEST_1_1), new_consumer=False)
@cluster(num_nodes=5)
@matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all)
def test_version(self, version=str(LATEST_0_9), new_consumer=True, metadata_quorum=quorum.zk):
""" """
Sanity check out producer performance service - verify that we can run the service with a small Sanity check out producer performance service - verify that we can run the service with a small
number of messages. The actual stats here are pretty meaningless since the number of messages is quite small. number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
@ -80,7 +73,7 @@ class PerformanceServiceTest(Test):
# check basic run of consumer performance service # check basic run of consumer performance service
self.consumer_perf = ConsumerPerformanceService( self.consumer_perf = ConsumerPerformanceService(
self.test_context, 1, self.kafka, new_consumer=new_consumer, self.test_context, 1, self.kafka,
topic=self.topic, version=version, messages=self.num_records) topic=self.topic, version=version, messages=self.num_records)
self.consumer_perf.group = "test-consumer-group" self.consumer_perf.group = "test-consumer-group"
self.consumer_perf.run() self.consumer_perf.run()

View File

@ -23,7 +23,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import is_version from kafkatest.utils import is_version
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, DEV_BRANCH, KafkaVersion from kafkatest.version import DEV_BRANCH, KafkaVersion
class TestVerifiableProducer(Test): class TestVerifiableProducer(Test):
@ -45,10 +45,6 @@ class TestVerifiableProducer(Test):
self.zk.start() self.zk.start()
@cluster(num_nodes=3) @cluster(num_nodes=3)
@parametrize(producer_version=str(LATEST_0_8_2))
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(LATEST_0_10_0))
@parametrize(producer_version=str(LATEST_0_10_1))
@matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False]) @matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False])
@matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True]) @matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True])
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all) @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@ -81,19 +77,6 @@ class TestVerifiableProducer(Test):
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15, wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
err_msg="Producer failed to start in a reasonable amount of time.") err_msg="Producer failed to start in a reasonable amount of time.")
# using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
# that this check works with DEV_BRANCH
# When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X should show up because of the
# way verifiable producer pulls in some development directories into its classpath
#
# If the test fails here because 'ps .. | grep' couldn't find the process it means
# the login and grep that is_version() performs is slower than
# the time it takes the producer to produce its messages.
# Easy fix is to decrease throughput= above, the good fix is to make the producer
# not terminate until explicitly killed in this case.
if node.version <= LATEST_0_8_2:
assert is_version(node, [node.version.vstring, LATEST_0_9.vstring], logger=self.logger)
else:
assert is_version(node, [node.version.vstring], logger=self.logger) assert is_version(node, [node.version.vstring], logger=self.logger)
self.producer.wait() self.producer.wait()

View File

@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 from kafkatest.version import DEV_BRANCH, LATEST_3_7
from kafkatest.services.kafka.util import fix_opts_for_new_jvm from kafkatest.services.kafka.util import fix_opts_for_new_jvm
""" """
@ -118,9 +118,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
self.isolation_level = isolation_level self.isolation_level = isolation_level
self.enable_systest_events = enable_systest_events self.enable_systest_events = enable_systest_events
if self.enable_systest_events:
# Only available in 0.10.0 and up
assert version >= V_0_10_0_0
self.print_timestamp = print_timestamp self.print_timestamp = print_timestamp
self.jaas_override_variables = jaas_override_variables or {} self.jaas_override_variables = jaas_override_variables or {}
@ -134,10 +131,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
"""Return a string which can be used to create a configuration file appropriate for the given node.""" """Return a string which can be used to create a configuration file appropriate for the given node."""
# Process client configuration # Process client configuration
prop_file = self.render('console_consumer.properties') prop_file = self.render('console_consumer.properties')
if hasattr(node, "version") and node.version <= LATEST_0_8_2:
# in 0.8.2.X and earlier, console consumer does not have --timeout-ms option
# instead, we have to pass it through the config file
prop_file += "\nconsumer.timeout.ms=%s\n" % str(self.consumer_timeout_ms)
# Add security properties to the config. If security protocol is not specified, # Add security properties to the config. If security protocol is not specified,
# use the default in the template properties. # use the default in the template properties.
@ -176,19 +169,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
"%(console_consumer)s " \ "%(console_consumer)s " \
"--topic %(topic)s " \ "--topic %(topic)s " \
"--consumer.config %(config_file)s " % args "--consumer.config %(config_file)s " % args
if self.new_consumer:
assert node.version.consumer_supports_bootstrap_server(), \
"new_consumer is only supported if version >= 0.9.0.0, version %s" % str(node.version)
if node.version <= LATEST_0_10_0:
cmd += " --new-consumer"
cmd += " --bootstrap-server %(broker_list)s" % args cmd += " --bootstrap-server %(broker_list)s" % args
if node.version >= V_0_11_0_0:
cmd += " --isolation-level %s" % self.isolation_level cmd += " --isolation-level %s" % self.isolation_level
else:
assert node.version < V_2_0_0, \
"new_consumer==false is only supported if version < 2.0.0, version %s" % str(node.version)
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning: if self.from_beginning:
cmd += " --from-beginning" cmd += " --from-beginning"
@ -196,7 +178,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
if self.consumer_timeout_ms is not None: if self.consumer_timeout_ms is not None:
# version 0.8.X and below do not support --timeout-ms option # version 0.8.X and below do not support --timeout-ms option
# This will be added in the properties file instead # This will be added in the properties file instead
if node.version > LATEST_0_8_2:
cmd += " --timeout-ms %s" % self.consumer_timeout_ms cmd += " --timeout-ms %s" % self.consumer_timeout_ms
if self.print_timestamp: if self.print_timestamp:
@ -209,16 +190,12 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
cmd += " --property print.partition=true" cmd += " --property print.partition=true"
# LoggingMessageFormatter was introduced after 0.9 # LoggingMessageFormatter was introduced after 0.9
if node.version > LATEST_0_9:
if node.version > LATEST_3_7: if node.version > LATEST_3_7:
cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter"
else: else:
cmd += " --formatter kafka.tools.LoggingMessageFormatter" cmd += " --formatter kafka.tools.LoggingMessageFormatter"
if self.enable_systest_events: if self.enable_systest_events:
# enable systest events is only available in 0.10.0 and later
# check the assertion here as well, in case node.version has been modified
assert node.version >= V_0_10_0_0
cmd += " --enable-systest-events" cmd += " --enable-systest-events"
if self.consumer_properties is not None: if self.consumer_properties is not None:

View File

@ -41,12 +41,7 @@ listener.security.protocol.map={{ listener_security_protocol_map }}
{% if quorum_info.using_zk or quorum_info.has_brokers %} {% if quorum_info.using_zk or quorum_info.has_brokers %}
advertised.host.name={{ node.account.hostname }} advertised.host.name={{ node.account.hostname }}
advertised.listeners={{ advertised_listeners }} advertised.listeners={{ advertised_listeners }}
{% if node.version.supports_named_listeners() %}
inter.broker.listener.name={{ interbroker_listener.name }} inter.broker.listener.name={{ interbroker_listener.name }}
{% else %}
security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
{% endif %}
{% endif %} {% endif %}
{% for k, v in listener_security_config.client_listener_overrides.items() %} {% for k, v in listener_security_config.client_listener_overrides.items() %}

View File

@ -16,12 +16,9 @@
from collections import namedtuple from collections import namedtuple
from kafkatest.utils.remote_account import java_version from kafkatest.utils.remote_account import java_version
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition']) TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)])
def fix_opts_for_new_jvm(node): def fix_opts_for_new_jvm(node):
# Startup scripts for early versions of Kafka contains options # Startup scripts for early versions of Kafka contains options
# that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC. # that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC.
@ -31,12 +28,6 @@ def fix_opts_for_new_jvm(node):
if java_ver <= 9: if java_ver <= 9:
return "" return ""
cmd = "" return ""
# check kafka version for kafka node types
if hasattr(node, 'version'):
if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version == LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
cmd += "export KAFKA_GC_LOG_OPTS=\"-Xlog:gc*:file=kafka-gc.log:time,tags:filecount=10,filesize=102400\"; "
cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; "
return cmd

View File

@ -19,7 +19,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.version import get_version, V_0_11_0_0, V_3_4_0, DEV_BRANCH from kafkatest.version import get_version, V_3_4_0, DEV_BRANCH
class JmxMixin(object): class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats. """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
@ -139,9 +139,6 @@ class JmxMixin(object):
# To correctly wait for requested JMX metrics to be added we need the --wait option for JmxTool. This option was # To correctly wait for requested JMX metrics to be added we need the --wait option for JmxTool. This option was
# not added until 0.11.0.1, so any earlier versions need to use JmxTool from a newer version. # not added until 0.11.0.1, so any earlier versions need to use JmxTool from a newer version.
version = get_version(node) version = get_version(node)
if version <= V_0_11_0_0:
return DEV_BRANCH
else:
return version return version
def jmx_class_name(self, version): def jmx_class_name(self, version):

View File

@ -18,8 +18,7 @@ import os
from kafkatest.services.kafka.util import fix_opts_for_new_jvm from kafkatest.services.kafka.util import fix_opts_for_new_jvm
from kafkatest.services.performance import PerformanceService from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import V_2_5_0, DEV_BRANCH
from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0
class ConsumerPerformanceService(PerformanceService): class ConsumerPerformanceService(PerformanceService):
@ -65,25 +64,14 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANCH, new_consumer=True, settings={}): def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANCH, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes) super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka self.kafka = kafka
self.security_config = kafka.security_config.client_config() self.security_config = kafka.security_config.client_config()
self.topic = topic self.topic = topic
self.messages = messages self.messages = messages
self.new_consumer = new_consumer
self.settings = settings self.settings = settings
assert version.consumer_supports_bootstrap_server() or (not new_consumer), \
"new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version)
assert version < V_2_0_0 or new_consumer, \
"new_consumer==false is only supported if version < 2.0.0, version %s" % str(version)
security_protocol = self.security_config.security_protocol
assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
# These less-frequently used settings can be updated manually after instantiation # These less-frequently used settings can be updated manually after instantiation
self.fetch_size = None self.fetch_size = None
self.socket_buffer_size = None self.socket_buffer_size = None
@ -97,17 +85,13 @@ class ConsumerPerformanceService(PerformanceService):
"""Dictionary of arguments used to start the Consumer Performance script.""" """Dictionary of arguments used to start the Consumer Performance script."""
args = { args = {
'topic': self.topic, 'topic': self.topic,
'messages': self.messages, 'messages': self.messages
} }
if self.new_consumer: if version < V_2_5_0:
if version <= LATEST_0_10_0:
args['new-consumer'] = ""
args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else: else:
args['bootstrap-server'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) args['bootstrap-server'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
args['zookeeper'] = self.kafka.zk_connect_setting()
if self.fetch_size is not None: if self.fetch_size is not None:
args['fetch-size'] = self.fetch_size args['fetch-size'] = self.fetch_size
@ -132,8 +116,6 @@ class ConsumerPerformanceService(PerformanceService):
for key, value in self.args(node.version).items(): for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value) cmd += " --%s %s" % (key, value)
if node.version.consumer_supports_bootstrap_server():
# This is only used for security settings
cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items(): for key, value in self.settings.items():
@ -143,22 +125,6 @@ class ConsumerPerformanceService(PerformanceService):
'stderr': ConsumerPerformanceService.STDERR_CAPTURE} 'stderr': ConsumerPerformanceService.STDERR_CAPTURE}
return cmd return cmd
def parse_results(self, line, version):
parts = line.split(',')
if version.consumer_supports_bootstrap_server():
result = {
'total_mb': float(parts[2]),
'mbps': float(parts[3]),
'records_per_sec': float(parts[5]),
}
else:
result = {
'total_mb': float(parts[3]),
'mbps': float(parts[4]),
'records_per_sec': float(parts[6]),
}
return result
def _worker(self, idx, node): def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
@ -174,4 +140,10 @@ class ConsumerPerformanceService(PerformanceService):
last = line last = line
# Parse and save the last line's information # Parse and save the last line's information
self.results[idx-1] = self.parse_results(last, node.version) if last is not None:
parts = last.split(',')
self.results[idx-1] = {
'total_mb': float(parts[2]),
'mbps': float(parts[3]),
'records_per_sec': float(parts[5]),
}

View File

@ -53,14 +53,6 @@ class EndToEndLatencyService(PerformanceService):
self.security_config = kafka.security_config.client_config() self.security_config = kafka.security_config.client_config()
self.version = '' self.version = ''
security_protocol = self.security_config.security_protocol
if not version.consumer_supports_bootstrap_server():
assert security_protocol == SecurityConfig.PLAINTEXT, \
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
assert compression_type == "none", \
"Compression type %s is only supported if version >= 0.9.0.0, version %s" % (compression_type, str(version))
self.args = { self.args = {
'topic': topic, 'topic': topic,
'num_records': num_records, 'num_records': num_records,
@ -82,20 +74,11 @@ class EndToEndLatencyService(PerformanceService):
'kafka_run_class': self.path.script("kafka-run-class.sh", node), 'kafka_run_class': self.path.script("kafka-run-class.sh", node),
'java_class_name': self.java_class_name() 'java_class_name': self.java_class_name()
}) })
if not node.version.consumer_supports_bootstrap_server():
args.update({
'zk_connect': self.kafka.zk_connect_setting(),
})
cmd = fix_opts_for_new_jvm(node) cmd = fix_opts_for_new_jvm(node)
cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
if node.version.consumer_supports_bootstrap_server():
cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args
cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
else:
# Set fetch max wait to 0 to match behavior in later versions
cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s kafka.tools.TestEndToEndLatency " % args
cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args
cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': EndToEndLatencyService.STDOUT_CAPTURE, cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': EndToEndLatencyService.STDOUT_CAPTURE,
'stderr': EndToEndLatencyService.STDERR_CAPTURE} 'stderr': EndToEndLatencyService.STDERR_CAPTURE}
@ -109,7 +92,6 @@ class EndToEndLatencyService(PerformanceService):
node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config) node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config)
client_config = str(self.security_config) client_config = str(self.security_config)
if node.version.consumer_supports_bootstrap_server():
client_config += "compression_type=%(compression_type)s" % self.args client_config += "compression_type=%(compression_type)s" % self.args
node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config) node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config)

View File

@ -55,10 +55,6 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
self.kafka = kafka self.kafka = kafka
self.security_config = kafka.security_config.client_config() self.security_config = kafka.security_config.client_config()
security_protocol = self.security_config.security_protocol
assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
self.args = { self.args = {
'topic': topic, 'topic': topic,
'kafka_opts': self.security_config.kafka_opts, 'kafka_opts': self.security_config.kafka_opts,

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9
from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.cluster.remoteaccount import RemoteCommandError
import importlib import importlib
@ -249,17 +247,7 @@ class VerifiableClientJava (VerifiableClient):
def exec_cmd (self, node): def exec_cmd (self, node):
""" :return: command to execute to start instance """ :return: command to execute to start instance
Translates Verifiable* to the corresponding Java client class name """ Translates Verifiable* to the corresponding Java client class name """
cmd = "" cmd = fix_opts_for_new_jvm(node)
if self.java_class_name == 'VerifiableProducer' and node.version <= LATEST_0_8_2:
# 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
# the tools jar from 0.9.x to the classpath
# TODO remove with KAFKA-14762
tools_jar = self.parent.path.jar(TOOLS_JAR_NAME, LATEST_0_9)
tools_dependant_libs_jar = self.parent.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, LATEST_0_9)
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
cmd += "export CLASSPATH; "
cmd += fix_opts_for_new_jvm(node)
cmd += self.parent.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + self.java_class_name cmd += self.parent.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + self.java_class_name
return cmd return cmd

View File

@ -21,7 +21,7 @@ from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition, consumer_group from kafkatest.services.kafka import TopicPartition, consumer_group
from kafkatest.services.verifiable_client import VerifiableClientMixin from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0, V_4_0_0 from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
class ConsumerState: class ConsumerState:
@ -317,10 +317,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
"Version %s does not support static membership (must be 2.3 or higher)" % str(node.version) "Version %s does not support static membership (must be 2.3 or higher)" % str(node.version)
node.group_instance_id = self.group_id + "-instance-" + str(idx) node.group_instance_id = self.group_id + "-instance-" + str(idx)
if self.assignment_strategy:
assert node.version >= V_0_10_0_0, \
"Version %s does not setting an assignment strategy (must be 0.10.0 or higher)" % str(node.version)
cmd = self.start_cmd(node) cmd = self.start_cmd(node)
self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd)) self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))

View File

@ -26,8 +26,8 @@ from ducktape.tests.test import TestContext
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import KafkaService, quorum
from ducktape.tests.test import Test from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import DEV_BRANCH, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
def get_broker_features(broker_version): def get_broker_features(broker_version):
@ -107,9 +107,6 @@ class ClientCompatibilityFeaturesTest(Test):
@cluster(num_nodes=7) @cluster(num_nodes=7)
@matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade)
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_2_1)) @parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_2)) @parametrize(broker_version=str(LATEST_2_2))
@parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_3))

View File

@ -23,8 +23,8 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import DEV_BRANCH, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest): class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@ -58,9 +58,6 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade)
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_2_1)) @parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_2)) @parametrize(broker_version=str(LATEST_2_2))
@parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_3))

View File

@ -21,7 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.version import DEV_BRANCH, LATEST_1_1 from kafkatest.version import DEV_BRANCH
class QuotaConfig(object): class QuotaConfig(object):
CLIENT_ID = 'client-id' CLIENT_ID = 'client-id'
@ -129,23 +129,13 @@ class QuotaTest(Test):
@cluster(num_nodes=5) @cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False]) @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2) @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
@parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True) def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
@parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True)
def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1,
old_broker_throttling_behavior=False, old_client_throttling_behavior=False):
# Old (pre-2.0) throttling behavior for broker throttles before sending a response to the client.
if old_broker_throttling_behavior:
self.kafka.set_version(LATEST_1_1)
self.kafka.start() self.kafka.start()
self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka) self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
producer_client_id = self.quota_config.client_id producer_client_id = self.quota_config.client_id
consumer_client_id = self.quota_config.client_id consumer_client_id = self.quota_config.client_id
# Old (pre-2.0) throttling behavior for client does not throttle upon receiving a response with a non-zero throttle time.
if old_client_throttling_behavior:
client_version = LATEST_1_1
else:
client_version = DEV_BRANCH client_version = DEV_BRANCH
# Produce all messages # Produce all messages

View File

@ -24,7 +24,7 @@ from kafkatest.services.kafka import KafkaService, config_property, quorum, cons
from kafkatest.services.connect import ConnectDistributedService, ConnectServiceBase, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource from kafkatest.services.connect import ConnectDistributedService, ConnectServiceBase, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, LATEST_2_0, LATEST_1_1, LATEST_1_0, KafkaVersion from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, KafkaVersion
from functools import reduce from functools import reduce
from collections import Counter, namedtuple from collections import Counter, namedtuple

View File

@ -21,8 +21,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, \ from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \ LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
@ -48,8 +47,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@cluster(num_nodes=6) @cluster(num_nodes=6)
@matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_2_1)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_4)], consumer_version=[str(LATEST_2_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_4)], consumer_version=[str(LATEST_2_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@ -67,17 +65,6 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_3_7)], consumer_version=[str(LATEST_3_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_3_7)], consumer_version=[str(LATEST_3_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_8)], consumer_version=[str(LATEST_3_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_3_8)], consumer_version=[str(LATEST_3_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_1_0)], consumer_version=[str(LATEST_1_0)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_11_0)], consumer_version=[str(LATEST_0_11_0)], compression_types=[["gzip"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_10_2)], consumer_version=[str(LATEST_0_10_2)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_10_1)], consumer_version=[str(LATEST_0_10_1)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_10_0)], consumer_version=[str(LATEST_0_10_0)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk): def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk):
if not new_consumer and metadata_quorum != quorum.zk: if not new_consumer and metadata_quorum != quorum.zk:
raise Exception("ZooKeeper-based consumers are not supported when using a KRaft metadata quorum") raise Exception("ZooKeeper-based consumers are not supported when using a KRaft metadata quorum")

View File

@ -19,8 +19,7 @@ from ducktape.mark.resource import cluster
from ducktape.tests.test import Test from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3
from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.services.streams import CooperativeRebalanceUpgradeService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
@ -44,9 +43,7 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
second_bounce_phase = "second_bounce_phase-" second_bounce_phase = "second_bounce_phase-"
# !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED # !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED
streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), streams_eager_rebalance_upgrade_versions = [str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]
str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2),
str(LATEST_2_3)]
def __init__(self, test_context): def __init__(self, test_context):
super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context) super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context)

View File

@ -22,8 +22,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
StreamsUpgradeTestJobRunnerService StreamsUpgradeTestJobRunnerService
from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id
from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \
KafkaVersion KafkaVersion
@ -33,8 +32,7 @@ broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), st
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)] str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)]
metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
# upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0 # upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0
# -> https://issues.apache.org/jira/browse/KAFKA-14646 # -> https://issues.apache.org/jira/browse/KAFKA-14646

View File

@ -62,21 +62,6 @@ class KafkaVersion(LooseVersion):
return LooseVersion._cmp(self, other) return LooseVersion._cmp(self, other)
def consumer_supports_bootstrap_server(self):
"""
Kafka supported a new consumer beginning with v0.9.0 where
we can specify --bootstrap-server instead of --zookeeper.
This version also allowed a --consumer-config file where we could specify
a security protocol other than PLAINTEXT.
:return: true if the version of Kafka supports a new consumer with --bootstrap-server
"""
return self >= V_0_9_0_0
def supports_named_listeners(self):
return self >= V_0_10_2_0
def acl_command_supports_bootstrap_server(self): def acl_command_supports_bootstrap_server(self):
return self >= V_2_1_0 return self >= V_2_1_0
@ -127,58 +112,6 @@ DEV_VERSION = KafkaVersion("4.0.0-SNAPSHOT")
# This should match the LATEST_PRODUCTION version defined in MetadataVersion.java # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java
LATEST_STABLE_METADATA_VERSION = "4.0-IV0" LATEST_STABLE_METADATA_VERSION = "4.0-IV0"
# 0.8.2.x versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
V_0_8_2_2 = KafkaVersion("0.8.2.2")
LATEST_0_8_2 = V_0_8_2_2
# 0.9.0.x versions
V_0_9_0_0 = KafkaVersion("0.9.0.0")
V_0_9_0_1 = KafkaVersion("0.9.0.1")
LATEST_0_9 = V_0_9_0_1
# 0.10.0.x versions
V_0_10_0_0 = KafkaVersion("0.10.0.0")
V_0_10_0_1 = KafkaVersion("0.10.0.1")
LATEST_0_10_0 = V_0_10_0_1
# 0.10.1.x versions
V_0_10_1_0 = KafkaVersion("0.10.1.0")
V_0_10_1_1 = KafkaVersion("0.10.1.1")
LATEST_0_10_1 = V_0_10_1_1
# 0.10.2.x versions
V_0_10_2_0 = KafkaVersion("0.10.2.0")
V_0_10_2_1 = KafkaVersion("0.10.2.1")
V_0_10_2_2 = KafkaVersion("0.10.2.2")
LATEST_0_10_2 = V_0_10_2_2
LATEST_0_10 = LATEST_0_10_2
# 0.11.0.x versions
V_0_11_0_0 = KafkaVersion("0.11.0.0")
V_0_11_0_1 = KafkaVersion("0.11.0.1")
V_0_11_0_2 = KafkaVersion("0.11.0.2")
V_0_11_0_3 = KafkaVersion("0.11.0.3")
LATEST_0_11_0 = V_0_11_0_3
LATEST_0_11 = LATEST_0_11_0
# 1.0.x versions
V_1_0_0 = KafkaVersion("1.0.0")
V_1_0_1 = KafkaVersion("1.0.1")
V_1_0_2 = KafkaVersion("1.0.2")
LATEST_1_0 = V_1_0_2
# 1.1.x versions
V_1_1_0 = KafkaVersion("1.1.0")
V_1_1_1 = KafkaVersion("1.1.1")
LATEST_1_1 = V_1_1_1
# 2.0.x versions
V_2_0_0 = KafkaVersion("2.0.0")
V_2_0_1 = KafkaVersion("2.0.1")
LATEST_2_0 = V_2_0_1
# 2.1.x versions # 2.1.x versions
V_2_1_0 = KafkaVersion("2.1.0") V_2_1_0 = KafkaVersion("2.1.0")
V_2_1_1 = KafkaVersion("2.1.1") V_2_1_1 = KafkaVersion("2.1.1")

View File

@ -16,7 +16,7 @@
from kafkatest.directory_layout.kafka_path import create_path_resolver, KafkaSystemTestPathResolver, \ from kafkatest.directory_layout.kafka_path import create_path_resolver, KafkaSystemTestPathResolver, \
KAFKA_PATH_RESOLVER_KEY KAFKA_PATH_RESOLVER_KEY
from kafkatest.version import V_0_9_0_1, DEV_BRANCH, KafkaVersion from kafkatest.version import V_2_1_0, DEV_BRANCH, KafkaVersion
class DummyContext(object): class DummyContext(object):
@ -64,9 +64,9 @@ class CheckCreatePathResolver(object):
"""Check expected paths when using versions.""" """Check expected paths when using versions."""
resolver = create_path_resolver(DummyContext()) resolver = create_path_resolver(DummyContext())
assert resolver.home(V_0_9_0_1) == "/opt/kafka-0.9.0.1" assert resolver.home(V_2_1_0) == "/opt/kafka-2.1.0"
assert resolver.bin(V_0_9_0_1) == "/opt/kafka-0.9.0.1/bin" assert resolver.bin(V_2_1_0) == "/opt/kafka-2.1.0/bin"
assert resolver.script("kafka-run-class.sh", V_0_9_0_1) == "/opt/kafka-0.9.0.1/bin/kafka-run-class.sh" assert resolver.script("kafka-run-class.sh", V_2_1_0) == "/opt/kafka-2.1.0/bin/kafka-run-class.sh"
def check_node_or_version_helper(self): def check_node_or_version_helper(self):
"""KafkaSystemTestPathResolver has a helper method which can take a node or version, and returns the version. """KafkaSystemTestPathResolver has a helper method which can take a node or version, and returns the version.
@ -79,8 +79,8 @@ class CheckCreatePathResolver(object):
assert resolver._version(node) == DEV_BRANCH assert resolver._version(node) == DEV_BRANCH
# Node with version attribute should resolve to the version attribute # Node with version attribute should resolve to the version attribute
node.version = V_0_9_0_1 node.version = V_2_1_0
assert resolver._version(node) == V_0_9_0_1 assert resolver._version(node) == V_2_1_0
# A KafkaVersion object should resolve to itself # A KafkaVersion object should resolve to itself
assert resolver._version(DEV_BRANCH) == DEV_BRANCH assert resolver._version(DEV_BRANCH) == DEV_BRANCH

View File

@ -15,7 +15,7 @@
from mock import Mock from mock import Mock
from kafkatest.version import DEV_BRANCH, V_0_8_2_2, get_version from kafkatest.version import DEV_BRANCH, V_2_1_0, get_version
class CheckVersion(object): class CheckVersion(object):
@ -29,5 +29,5 @@ class CheckVersion(object):
assert get_version(node) == DEV_BRANCH assert get_version(node) == DEV_BRANCH
node = Mock() node = Mock()
node.version = V_0_8_2_2 node.version = V_2_1_0
assert get_version(node) == V_0_8_2_2 assert get_version(node) == V_2_1_0

View File

@ -114,24 +114,6 @@ apt-get install -y iperf traceroute
# We want to use the latest Scala version per Kafka version # We want to use the latest Scala version per Kafka version
# Previously we could not pull in Scala 2.12 builds, because Scala 2.12 requires Java 8 and we were running the system # Previously we could not pull in Scala 2.12 builds, because Scala 2.12 requires Java 8 and we were running the system
# tests with Java 7. We have since switched to Java 8, so 2.0.0 and later use Scala 2.12. # tests with Java 7. We have since switched to Java 8, so 2.0.0 and later use Scala 2.12.
get_kafka 0.8.2.2 2.11
chmod a+rw /opt/kafka-0.8.2.2
get_kafka 0.9.0.1 2.11
chmod a+rw /opt/kafka-0.9.0.1
get_kafka 0.10.0.1 2.11
chmod a+rw /opt/kafka-0.10.0.1
get_kafka 0.10.1.1 2.11
chmod a+rw /opt/kafka-0.10.1.1
get_kafka 0.10.2.2 2.11
chmod a+rw /opt/kafka-0.10.2.2
get_kafka 0.11.0.3 2.11
chmod a+rw /opt/kafka-0.11.0.3
get_kafka 1.0.2 2.11
chmod a+rw /opt/kafka-1.0.2
get_kafka 1.1.1 2.11
chmod a+rw /opt/kafka-1.1.1
get_kafka 2.0.1 2.12
chmod a+rw /opt/kafka-2.0.1
get_kafka 2.1.1 2.12 get_kafka 2.1.1 2.12
chmod a+rw /opt/kafka-2.1.1 chmod a+rw /opt/kafka-2.1.1
get_kafka 2.2.2 2.12 get_kafka 2.2.2 2.12