From 440e0b8801bca5a648a2cc2a7e98f25c44b0810d Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 11 Nov 2024 06:22:06 +0800 Subject: [PATCH] KAFKA-17923 Remove old kafka version from e2e (#17673) Reviewers: Chia-Ping Tsai --- tests/docker/Dockerfile | 16 ----- .../kafkatest/directory_layout/kafka_path.py | 7 +- .../sanity_checks/test_console_consumer.py | 23 ------- .../test_performance_services.py | 15 ++--- .../sanity_checks/test_verifiable_producer.py | 21 +----- tests/kafkatest/services/console_consumer.py | 39 +++-------- .../services/kafka/templates/kafka.properties | 5 -- tests/kafkatest/services/kafka/util.py | 11 +-- tests/kafkatest/services/monitor/jmx.py | 7 +- .../performance/consumer_performance.py | 56 ++++------------ .../performance/end_to_end_latency.py | 24 +------ .../performance/producer_performance.py | 4 -- tests/kafkatest/services/verifiable_client.py | 14 +--- .../kafkatest/services/verifiable_consumer.py | 6 +- .../client_compatibility_features_test.py | 7 +- ...ient_compatibility_produce_consume_test.py | 7 +- tests/kafkatest/tests/client/quota_test.py | 16 +---- .../tests/connect/connect_distributed_test.py | 2 +- .../compatibility_test_new_broker_test.py | 17 +---- ...eams_cooperative_rebalance_upgrade_test.py | 7 +- .../tests/streams/streams_upgrade_test.py | 6 +- tests/kafkatest/version.py | 67 ------------------- .../directory_layout/check_project_paths.py | 12 ++-- tests/unit/version/check_version.py | 6 +- vagrant/base.sh | 18 ----- 25 files changed, 60 insertions(+), 353 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index aefb500ef0e..198ffde1b7c 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -77,15 +77,6 @@ RUN echo 'PermitUserEnvironment yes' >> /etc/ssh/sshd_config # Install binary test dependencies. # we use the same versions as in vagrant/base.sh ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" -RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" -RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" -RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" -RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" -RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2" -RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" -RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" -RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" -RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1" RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1" RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" @@ -106,13 +97,6 @@ RUN mkdir -p "/opt/kafka-3.8.1" && chmod a+rw /opt/kafka-3.8.1 && curl -s "$KAFK # Streams test dependencies -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py index 1e3d0034e4c..90a84d44af5 100644 --- a/tests/kafkatest/directory_layout/kafka_path.py +++ b/tests/kafkatest/directory_layout/kafka_path.py @@ -16,7 +16,7 @@ import importlib import os -from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH, LATEST_0_9, LATEST_3_5 +from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH, LATEST_3_5 """This module serves a few purposes: @@ -55,11 +55,6 @@ JARS = { # This version of the file connectors does not contain ServiceLoader manifests LATEST_3_5.__str__(): { CONNECT_FILE_JAR: "libs/connect-file*.jar" - }, - # TODO: This is only used in 0.8.2.x system tests, remove with KAFKA-14762 - LATEST_0_9.__str__(): { - TOOLS_JAR_NAME: "libs/kafka-tools*.jar", - TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/{argparse4j,jackson}*.jar" } } diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 675920ade8f..9a271232541 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -22,10 +22,8 @@ from ducktape.utils.util import wait_until from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.utils.remote_account import line_count, file_exists -from kafkatest.version import LATEST_0_8_2 class ConsoleConsumerTest(Test): @@ -77,24 +75,3 @@ class ConsoleConsumerTest(Test): assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) - - @cluster(num_nodes=4) - def test_version(self): - """Check that console consumer v0.8.2.X successfully starts and consumes messages.""" - self.kafka.start() - - num_messages = 1000 - self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, - max_messages=num_messages, throughput=1000) - self.producer.start() - self.producer.wait() - - self.consumer.nodes[0].version = LATEST_0_8_2 - self.consumer.new_consumer = False - self.consumer.consumer_timeout_ms = 1000 - self.consumer.start() - self.consumer.wait() - - num_consumed = len(self.consumer.messages_consumed[1]) - num_produced = self.producer.num_acked - assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed) diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py index f00ec492d4e..27e732edfb5 100644 --- a/tests/kafkatest/sanity_checks/test_performance_services.py +++ b/tests/kafkatest/sanity_checks/test_performance_services.py @@ -21,7 +21,7 @@ from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService from kafkatest.services.performance import latency, compute_aggregate_throughput from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_1_1, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_2_1, KafkaVersion class PerformanceServiceTest(Test): @@ -38,15 +38,8 @@ class PerformanceServiceTest(Test): self.zk.start() @cluster(num_nodes=5) - # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, - # the overhead should be manageable. - @parametrize(version=str(LATEST_0_8_2), new_consumer=False) - @parametrize(version=str(LATEST_0_9), new_consumer=False) - @parametrize(version=str(LATEST_0_9)) - @parametrize(version=str(LATEST_1_1), new_consumer=False) - @cluster(num_nodes=5) - @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) - def test_version(self, version=str(LATEST_0_9), new_consumer=True, metadata_quorum=quorum.zk): + @matrix(version=[str(LATEST_2_1), str(DEV_BRANCH)], metadata_quorum=quorum.all) + def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk): """ Sanity check out producer performance service - verify that we can run the service with a small number of messages. The actual stats here are pretty meaningless since the number of messages is quite small. @@ -80,7 +73,7 @@ class PerformanceServiceTest(Test): # check basic run of consumer performance service self.consumer_perf = ConsumerPerformanceService( - self.test_context, 1, self.kafka, new_consumer=new_consumer, + self.test_context, 1, self.kafka, topic=self.topic, version=version, messages=self.num_records) self.consumer_perf.group = "test-consumer-group" self.consumer_perf.run() diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index 152e9697293..9bd365bee64 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -23,7 +23,7 @@ from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.utils import is_version -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, DEV_BRANCH, KafkaVersion +from kafkatest.version import DEV_BRANCH, KafkaVersion class TestVerifiableProducer(Test): @@ -45,10 +45,6 @@ class TestVerifiableProducer(Test): self.zk.start() @cluster(num_nodes=3) - @parametrize(producer_version=str(LATEST_0_8_2)) - @parametrize(producer_version=str(LATEST_0_9)) - @parametrize(producer_version=str(LATEST_0_10_0)) - @parametrize(producer_version=str(LATEST_0_10_1)) @matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False]) @matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True]) @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all) @@ -81,20 +77,7 @@ class TestVerifiableProducer(Test): wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15, err_msg="Producer failed to start in a reasonable amount of time.") - # using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring - # that this check works with DEV_BRANCH - # When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X should show up because of the - # way verifiable producer pulls in some development directories into its classpath - # - # If the test fails here because 'ps .. | grep' couldn't find the process it means - # the login and grep that is_version() performs is slower than - # the time it takes the producer to produce its messages. - # Easy fix is to decrease throughput= above, the good fix is to make the producer - # not terminate until explicitly killed in this case. - if node.version <= LATEST_0_8_2: - assert is_version(node, [node.version.vstring, LATEST_0_9.vstring], logger=self.logger) - else: - assert is_version(node, [node.version.vstring], logger=self.logger) + assert is_version(node, [node.version.vstring], logger=self.logger) self.producer.wait() num_produced = self.producer.num_acked diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index fb87f20df19..cc8a0d31997 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 +from kafkatest.version import DEV_BRANCH, LATEST_3_7 from kafkatest.services.kafka.util import fix_opts_for_new_jvm """ @@ -118,9 +118,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) self.isolation_level = isolation_level self.enable_systest_events = enable_systest_events - if self.enable_systest_events: - # Only available in 0.10.0 and up - assert version >= V_0_10_0_0 self.print_timestamp = print_timestamp self.jaas_override_variables = jaas_override_variables or {} @@ -134,10 +131,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) """Return a string which can be used to create a configuration file appropriate for the given node.""" # Process client configuration prop_file = self.render('console_consumer.properties') - if hasattr(node, "version") and node.version <= LATEST_0_8_2: - # in 0.8.2.X and earlier, console consumer does not have --timeout-ms option - # instead, we have to pass it through the config file - prop_file += "\nconsumer.timeout.ms=%s\n" % str(self.consumer_timeout_ms) # Add security properties to the config. If security protocol is not specified, # use the default in the template properties. @@ -176,19 +169,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) "%(console_consumer)s " \ "--topic %(topic)s " \ "--consumer.config %(config_file)s " % args - - if self.new_consumer: - assert node.version.consumer_supports_bootstrap_server(), \ - "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(node.version) - if node.version <= LATEST_0_10_0: - cmd += " --new-consumer" - cmd += " --bootstrap-server %(broker_list)s" % args - if node.version >= V_0_11_0_0: - cmd += " --isolation-level %s" % self.isolation_level - else: - assert node.version < V_2_0_0, \ - "new_consumer==false is only supported if version < 2.0.0, version %s" % str(node.version) - cmd += " --zookeeper %(zk_connect)s" % args + cmd += " --bootstrap-server %(broker_list)s" % args + cmd += " --isolation-level %s" % self.isolation_level if self.from_beginning: cmd += " --from-beginning" @@ -196,8 +178,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) if self.consumer_timeout_ms is not None: # version 0.8.X and below do not support --timeout-ms option # This will be added in the properties file instead - if node.version > LATEST_0_8_2: - cmd += " --timeout-ms %s" % self.consumer_timeout_ms + cmd += " --timeout-ms %s" % self.consumer_timeout_ms if self.print_timestamp: cmd += " --property print.timestamp=true" @@ -209,16 +190,12 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) cmd += " --property print.partition=true" # LoggingMessageFormatter was introduced after 0.9 - if node.version > LATEST_0_9: - if node.version > LATEST_3_7: - cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" - else: - cmd += " --formatter kafka.tools.LoggingMessageFormatter" + if node.version > LATEST_3_7: + cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" + else: + cmd += " --formatter kafka.tools.LoggingMessageFormatter" if self.enable_systest_events: - # enable systest events is only available in 0.10.0 and later - # check the assertion here as well, in case node.version has been modified - assert node.version >= V_0_10_0_0 cmd += " --enable-systest-events" if self.consumer_properties is not None: diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 21b60afeb83..1fd305a17df 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -41,12 +41,7 @@ listener.security.protocol.map={{ listener_security_protocol_map }} {% if quorum_info.using_zk or quorum_info.has_brokers %} advertised.host.name={{ node.account.hostname }} advertised.listeners={{ advertised_listeners }} - -{% if node.version.supports_named_listeners() %} inter.broker.listener.name={{ interbroker_listener.name }} -{% else %} -security.inter.broker.protocol={{ interbroker_listener.security_protocol }} -{% endif %} {% endif %} {% for k, v in listener_security_config.client_listener_overrides.items() %} diff --git a/tests/kafkatest/services/kafka/util.py b/tests/kafkatest/services/kafka/util.py index de6b85ff3c1..0965fd9d4e4 100644 --- a/tests/kafkatest/services/kafka/util.py +++ b/tests/kafkatest/services/kafka/util.py @@ -16,12 +16,9 @@ from collections import namedtuple from kafkatest.utils.remote_account import java_version -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0 TopicPartition = namedtuple('TopicPartition', ['topic', 'partition']) -new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)]) - def fix_opts_for_new_jvm(node): # Startup scripts for early versions of Kafka contains options # that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC. @@ -31,12 +28,6 @@ def fix_opts_for_new_jvm(node): if java_ver <= 9: return "" - cmd = "" - # check kafka version for kafka node types - if hasattr(node, 'version'): - if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version == LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0: - cmd += "export KAFKA_GC_LOG_OPTS=\"-Xlog:gc*:file=kafka-gc.log:time,tags:filecount=10,filesize=102400\"; " - cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; " - return cmd + return "" diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index b326c20aa3b..99a604d888a 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -19,7 +19,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin -from kafkatest.version import get_version, V_0_11_0_0, V_3_4_0, DEV_BRANCH +from kafkatest.version import get_version, V_3_4_0, DEV_BRANCH class JmxMixin(object): """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats. @@ -139,10 +139,7 @@ class JmxMixin(object): # To correctly wait for requested JMX metrics to be added we need the --wait option for JmxTool. This option was # not added until 0.11.0.1, so any earlier versions need to use JmxTool from a newer version. version = get_version(node) - if version <= V_0_11_0_0: - return DEV_BRANCH - else: - return version + return version def jmx_class_name(self, version): if version <= V_3_4_0: diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 34e2e0d05cd..eea91cbfd90 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -18,8 +18,7 @@ import os from kafkatest.services.kafka.util import fix_opts_for_new_jvm from kafkatest.services.performance import PerformanceService -from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0 +from kafkatest.version import V_2_5_0, DEV_BRANCH class ConsumerPerformanceService(PerformanceService): @@ -65,25 +64,14 @@ class ConsumerPerformanceService(PerformanceService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANCH, new_consumer=True, settings={}): + def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANCH, settings={}): super(ConsumerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka self.security_config = kafka.security_config.client_config() self.topic = topic self.messages = messages - self.new_consumer = new_consumer self.settings = settings - assert version.consumer_supports_bootstrap_server() or (not new_consumer), \ - "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version) - - assert version < V_2_0_0 or new_consumer, \ - "new_consumer==false is only supported if version < 2.0.0, version %s" % str(version) - - security_protocol = self.security_config.security_protocol - assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \ - "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) - # These less-frequently used settings can be updated manually after instantiation self.fetch_size = None self.socket_buffer_size = None @@ -97,17 +85,13 @@ class ConsumerPerformanceService(PerformanceService): """Dictionary of arguments used to start the Consumer Performance script.""" args = { 'topic': self.topic, - 'messages': self.messages, + 'messages': self.messages } - if self.new_consumer: - if version <= LATEST_0_10_0: - args['new-consumer'] = "" - args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) - else: - args['bootstrap-server'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) + if version < V_2_5_0: + args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) else: - args['zookeeper'] = self.kafka.zk_connect_setting() + args['bootstrap-server'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) if self.fetch_size is not None: args['fetch-size'] = self.fetch_size @@ -132,9 +116,7 @@ class ConsumerPerformanceService(PerformanceService): for key, value in self.args(node.version).items(): cmd += " --%s %s" % (key, value) - if node.version.consumer_supports_bootstrap_server(): - # This is only used for security settings - cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE + cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) @@ -143,22 +125,6 @@ class ConsumerPerformanceService(PerformanceService): 'stderr': ConsumerPerformanceService.STDERR_CAPTURE} return cmd - def parse_results(self, line, version): - parts = line.split(',') - if version.consumer_supports_bootstrap_server(): - result = { - 'total_mb': float(parts[2]), - 'mbps': float(parts[3]), - 'records_per_sec': float(parts[5]), - } - else: - result = { - 'total_mb': float(parts[3]), - 'mbps': float(parts[4]), - 'records_per_sec': float(parts[6]), - } - return result - def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False) @@ -174,4 +140,10 @@ class ConsumerPerformanceService(PerformanceService): last = line # Parse and save the last line's information - self.results[idx-1] = self.parse_results(last, node.version) + if last is not None: + parts = last.split(',') + self.results[idx-1] = { + 'total_mb': float(parts[2]), + 'mbps': float(parts[3]), + 'records_per_sec': float(parts[5]), + } diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 5e66b06104c..e7e0100e511 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -53,14 +53,6 @@ class EndToEndLatencyService(PerformanceService): self.security_config = kafka.security_config.client_config() self.version = '' - security_protocol = self.security_config.security_protocol - - if not version.consumer_supports_bootstrap_server(): - assert security_protocol == SecurityConfig.PLAINTEXT, \ - "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) - assert compression_type == "none", \ - "Compression type %s is only supported if version >= 0.9.0.0, version %s" % (compression_type, str(version)) - self.args = { 'topic': topic, 'num_records': num_records, @@ -82,20 +74,11 @@ class EndToEndLatencyService(PerformanceService): 'kafka_run_class': self.path.script("kafka-run-class.sh", node), 'java_class_name': self.java_class_name() }) - if not node.version.consumer_supports_bootstrap_server(): - args.update({ - 'zk_connect': self.kafka.zk_connect_setting(), - }) cmd = fix_opts_for_new_jvm(node) cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG - if node.version.consumer_supports_bootstrap_server(): - cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args - cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args - else: - # Set fetch max wait to 0 to match behavior in later versions - cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s kafka.tools.TestEndToEndLatency " % args - cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args + cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args + cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': EndToEndLatencyService.STDOUT_CAPTURE, 'stderr': EndToEndLatencyService.STDERR_CAPTURE} @@ -109,8 +92,7 @@ class EndToEndLatencyService(PerformanceService): node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config) client_config = str(self.security_config) - if node.version.consumer_supports_bootstrap_server(): - client_config += "compression_type=%(compression_type)s" % self.args + client_config += "compression_type=%(compression_type)s" % self.args node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config) self.security_config.setup_node(node) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index e0f00061cb9..acb0aec8650 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -55,10 +55,6 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService): self.kafka = kafka self.security_config = kafka.security_config.client_config() - security_protocol = self.security_config.security_protocol - assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \ - "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) - self.args = { 'topic': topic, 'kafka_opts': self.security_config.kafka_opts, diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index 649a428c37d..4971136a64e 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9 from ducktape.cluster.remoteaccount import RemoteCommandError import importlib @@ -249,17 +247,7 @@ class VerifiableClientJava (VerifiableClient): def exec_cmd (self, node): """ :return: command to execute to start instance Translates Verifiable* to the corresponding Java client class name """ - cmd = "" - if self.java_class_name == 'VerifiableProducer' and node.version <= LATEST_0_8_2: - # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add - # the tools jar from 0.9.x to the classpath - # TODO remove with KAFKA-14762 - tools_jar = self.parent.path.jar(TOOLS_JAR_NAME, LATEST_0_9) - tools_dependant_libs_jar = self.parent.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, LATEST_0_9) - cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar - cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar - cmd += "export CLASSPATH; " - cmd += fix_opts_for_new_jvm(node) + cmd = fix_opts_for_new_jvm(node) cmd += self.parent.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + self.java_class_name return cmd diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index a62dc047842..7e81ca1f7ce 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -21,7 +21,7 @@ from ducktape.services.background_thread import BackgroundThreadService from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import TopicPartition, consumer_group from kafkatest.services.verifiable_client import VerifiableClientMixin -from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0, V_4_0_0 +from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0 class ConsumerState: @@ -317,10 +317,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou "Version %s does not support static membership (must be 2.3 or higher)" % str(node.version) node.group_instance_id = self.group_id + "-instance-" + str(idx) - if self.assignment_strategy: - assert node.version >= V_0_10_0_0, \ - "Version %s does not setting an assignment strategy (must be 0.10.0 or higher)" % str(node.version) - cmd = self.start_cmd(node) self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd)) diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 7fe31d7336f..d0bcd80a791 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -26,8 +26,8 @@ from ducktape.tests.test import TestContext from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum from ducktape.tests.test import Test -from kafkatest.version import DEV_BRANCH, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ +from kafkatest.version import DEV_BRANCH, \ + LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion def get_broker_features(broker_version): @@ -107,9 +107,6 @@ class ClientCompatibilityFeaturesTest(Test): @cluster(num_nodes=7) @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) - @parametrize(broker_version=str(LATEST_1_0)) - @parametrize(broker_version=str(LATEST_1_1)) - @parametrize(broker_version=str(LATEST_2_0)) @parametrize(broker_version=str(LATEST_2_1)) @parametrize(broker_version=str(LATEST_2_2)) @parametrize(broker_version=str(LATEST_2_3)) diff --git a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py index 3ff3c2ba753..74bd5563200 100644 --- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py @@ -23,8 +23,8 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int_with_prefix -from kafkatest.version import DEV_BRANCH, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ +from kafkatest.version import DEV_BRANCH, \ + LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest): @@ -58,9 +58,6 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest): @cluster(num_nodes=9) @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) - @parametrize(broker_version=str(LATEST_1_0)) - @parametrize(broker_version=str(LATEST_1_1)) - @parametrize(broker_version=str(LATEST_2_0)) @parametrize(broker_version=str(LATEST_2_1)) @parametrize(broker_version=str(LATEST_2_2)) @parametrize(broker_version=str(LATEST_2_3)) diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 08a23ecc4f6..d52f9b6a944 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -21,7 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.performance import ProducerPerformanceService from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.version import DEV_BRANCH, LATEST_1_1 +from kafkatest.version import DEV_BRANCH class QuotaConfig(object): CLIENT_ID = 'client-id' @@ -129,24 +129,14 @@ class QuotaTest(Test): @cluster(num_nodes=5) @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False]) @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2) - @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True) - @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True) - def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1, - old_broker_throttling_behavior=False, old_client_throttling_behavior=False): - # Old (pre-2.0) throttling behavior for broker throttles before sending a response to the client. - if old_broker_throttling_behavior: - self.kafka.set_version(LATEST_1_1) + def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1): self.kafka.start() self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka) producer_client_id = self.quota_config.client_id consumer_client_id = self.quota_config.client_id - # Old (pre-2.0) throttling behavior for client does not throttle upon receiving a response with a non-zero throttle time. - if old_client_throttling_behavior: - client_version = LATEST_1_1 - else: - client_version = DEV_BRANCH + client_version = DEV_BRANCH # Produce all messages producer = ProducerPerformanceService( diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 64a80d2483e..e54118c3881 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -24,7 +24,7 @@ from kafkatest.services.kafka import KafkaService, config_property, quorum, cons from kafkatest.services.connect import ConnectDistributedService, ConnectServiceBase, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, LATEST_2_0, LATEST_1_1, LATEST_1_0, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, KafkaVersion from functools import reduce from collections import Counter, namedtuple diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index 35edaffc64d..c7f600a0f3e 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -21,8 +21,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, \ - LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \ +from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \ LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \ LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion @@ -48,8 +47,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): @cluster(num_nodes=6) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) - @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_2_1)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_4)], consumer_version=[str(LATEST_2_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @@ -67,17 +65,6 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): @matrix(producer_version=[str(LATEST_3_7)], consumer_version=[str(LATEST_3_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_3_8)], consumer_version=[str(LATEST_3_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_1_0)], consumer_version=[str(LATEST_1_0)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_11_0)], consumer_version=[str(LATEST_0_11_0)], compression_types=[["gzip"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_10_2)], consumer_version=[str(LATEST_0_10_2)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_10_1)], consumer_version=[str(LATEST_0_10_1)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_10_0)], consumer_version=[str(LATEST_0_10_0)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) - @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk): if not new_consumer and metadata_quorum != quorum.zk: raise Exception("ZooKeeper-based consumers are not supported when using a KRaft metadata quorum") diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index 992ad587923..a478f11f340 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -19,8 +19,7 @@ from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3 +from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3 from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running @@ -44,9 +43,7 @@ class StreamsCooperativeRebalanceUpgradeTest(Test): second_bounce_phase = "second_bounce_phase-" # !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED - streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), - str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), - str(LATEST_2_3)] + streams_eager_rebalance_upgrade_versions = [str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)] def __init__(self, test_context): super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 2b37e0c2a4f..759e4156920 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -22,8 +22,7 @@ from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ StreamsUpgradeTestJobRunnerService from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id -from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ +from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \ KafkaVersion @@ -33,8 +32,7 @@ broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), st str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)] -metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), - str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), +metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] # upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0 # -> https://issues.apache.org/jira/browse/KAFKA-14646 diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 56db6431030..00bcf536f84 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -62,21 +62,6 @@ class KafkaVersion(LooseVersion): return LooseVersion._cmp(self, other) - def consumer_supports_bootstrap_server(self): - """ - Kafka supported a new consumer beginning with v0.9.0 where - we can specify --bootstrap-server instead of --zookeeper. - - This version also allowed a --consumer-config file where we could specify - a security protocol other than PLAINTEXT. - - :return: true if the version of Kafka supports a new consumer with --bootstrap-server - """ - return self >= V_0_9_0_0 - - def supports_named_listeners(self): - return self >= V_0_10_2_0 - def acl_command_supports_bootstrap_server(self): return self >= V_2_1_0 @@ -127,58 +112,6 @@ DEV_VERSION = KafkaVersion("4.0.0-SNAPSHOT") # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java LATEST_STABLE_METADATA_VERSION = "4.0-IV0" -# 0.8.2.x versions -V_0_8_2_1 = KafkaVersion("0.8.2.1") -V_0_8_2_2 = KafkaVersion("0.8.2.2") -LATEST_0_8_2 = V_0_8_2_2 - -# 0.9.0.x versions -V_0_9_0_0 = KafkaVersion("0.9.0.0") -V_0_9_0_1 = KafkaVersion("0.9.0.1") -LATEST_0_9 = V_0_9_0_1 - -# 0.10.0.x versions -V_0_10_0_0 = KafkaVersion("0.10.0.0") -V_0_10_0_1 = KafkaVersion("0.10.0.1") -LATEST_0_10_0 = V_0_10_0_1 - -# 0.10.1.x versions -V_0_10_1_0 = KafkaVersion("0.10.1.0") -V_0_10_1_1 = KafkaVersion("0.10.1.1") -LATEST_0_10_1 = V_0_10_1_1 - -# 0.10.2.x versions -V_0_10_2_0 = KafkaVersion("0.10.2.0") -V_0_10_2_1 = KafkaVersion("0.10.2.1") -V_0_10_2_2 = KafkaVersion("0.10.2.2") -LATEST_0_10_2 = V_0_10_2_2 - -LATEST_0_10 = LATEST_0_10_2 - -# 0.11.0.x versions -V_0_11_0_0 = KafkaVersion("0.11.0.0") -V_0_11_0_1 = KafkaVersion("0.11.0.1") -V_0_11_0_2 = KafkaVersion("0.11.0.2") -V_0_11_0_3 = KafkaVersion("0.11.0.3") -LATEST_0_11_0 = V_0_11_0_3 -LATEST_0_11 = LATEST_0_11_0 - -# 1.0.x versions -V_1_0_0 = KafkaVersion("1.0.0") -V_1_0_1 = KafkaVersion("1.0.1") -V_1_0_2 = KafkaVersion("1.0.2") -LATEST_1_0 = V_1_0_2 - -# 1.1.x versions -V_1_1_0 = KafkaVersion("1.1.0") -V_1_1_1 = KafkaVersion("1.1.1") -LATEST_1_1 = V_1_1_1 - -# 2.0.x versions -V_2_0_0 = KafkaVersion("2.0.0") -V_2_0_1 = KafkaVersion("2.0.1") -LATEST_2_0 = V_2_0_1 - # 2.1.x versions V_2_1_0 = KafkaVersion("2.1.0") V_2_1_1 = KafkaVersion("2.1.1") diff --git a/tests/unit/directory_layout/check_project_paths.py b/tests/unit/directory_layout/check_project_paths.py index b9b76f13276..06cf6a4d7dd 100644 --- a/tests/unit/directory_layout/check_project_paths.py +++ b/tests/unit/directory_layout/check_project_paths.py @@ -16,7 +16,7 @@ from kafkatest.directory_layout.kafka_path import create_path_resolver, KafkaSystemTestPathResolver, \ KAFKA_PATH_RESOLVER_KEY -from kafkatest.version import V_0_9_0_1, DEV_BRANCH, KafkaVersion +from kafkatest.version import V_2_1_0, DEV_BRANCH, KafkaVersion class DummyContext(object): @@ -64,9 +64,9 @@ class CheckCreatePathResolver(object): """Check expected paths when using versions.""" resolver = create_path_resolver(DummyContext()) - assert resolver.home(V_0_9_0_1) == "/opt/kafka-0.9.0.1" - assert resolver.bin(V_0_9_0_1) == "/opt/kafka-0.9.0.1/bin" - assert resolver.script("kafka-run-class.sh", V_0_9_0_1) == "/opt/kafka-0.9.0.1/bin/kafka-run-class.sh" + assert resolver.home(V_2_1_0) == "/opt/kafka-2.1.0" + assert resolver.bin(V_2_1_0) == "/opt/kafka-2.1.0/bin" + assert resolver.script("kafka-run-class.sh", V_2_1_0) == "/opt/kafka-2.1.0/bin/kafka-run-class.sh" def check_node_or_version_helper(self): """KafkaSystemTestPathResolver has a helper method which can take a node or version, and returns the version. @@ -79,8 +79,8 @@ class CheckCreatePathResolver(object): assert resolver._version(node) == DEV_BRANCH # Node with version attribute should resolve to the version attribute - node.version = V_0_9_0_1 - assert resolver._version(node) == V_0_9_0_1 + node.version = V_2_1_0 + assert resolver._version(node) == V_2_1_0 # A KafkaVersion object should resolve to itself assert resolver._version(DEV_BRANCH) == DEV_BRANCH diff --git a/tests/unit/version/check_version.py b/tests/unit/version/check_version.py index 8cf8e9a06e7..04148962812 100644 --- a/tests/unit/version/check_version.py +++ b/tests/unit/version/check_version.py @@ -15,7 +15,7 @@ from mock import Mock -from kafkatest.version import DEV_BRANCH, V_0_8_2_2, get_version +from kafkatest.version import DEV_BRANCH, V_2_1_0, get_version class CheckVersion(object): @@ -29,5 +29,5 @@ class CheckVersion(object): assert get_version(node) == DEV_BRANCH node = Mock() - node.version = V_0_8_2_2 - assert get_version(node) == V_0_8_2_2 \ No newline at end of file + node.version = V_2_1_0 + assert get_version(node) == V_2_1_0 \ No newline at end of file diff --git a/vagrant/base.sh b/vagrant/base.sh index 958b9c7d7a8..d57a2d223a4 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -114,24 +114,6 @@ apt-get install -y iperf traceroute # We want to use the latest Scala version per Kafka version # Previously we could not pull in Scala 2.12 builds, because Scala 2.12 requires Java 8 and we were running the system # tests with Java 7. We have since switched to Java 8, so 2.0.0 and later use Scala 2.12. -get_kafka 0.8.2.2 2.11 -chmod a+rw /opt/kafka-0.8.2.2 -get_kafka 0.9.0.1 2.11 -chmod a+rw /opt/kafka-0.9.0.1 -get_kafka 0.10.0.1 2.11 -chmod a+rw /opt/kafka-0.10.0.1 -get_kafka 0.10.1.1 2.11 -chmod a+rw /opt/kafka-0.10.1.1 -get_kafka 0.10.2.2 2.11 -chmod a+rw /opt/kafka-0.10.2.2 -get_kafka 0.11.0.3 2.11 -chmod a+rw /opt/kafka-0.11.0.3 -get_kafka 1.0.2 2.11 -chmod a+rw /opt/kafka-1.0.2 -get_kafka 1.1.1 2.11 -chmod a+rw /opt/kafka-1.1.1 -get_kafka 2.0.1 2.12 -chmod a+rw /opt/kafka-2.0.1 get_kafka 2.1.1 2.12 chmod a+rw /opt/kafka-2.1.1 get_kafka 2.2.2 2.12