diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index cec8ea0e715..d4e873ac9c9 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -21,7 +21,7 @@ from ducktape.services.background_thread import BackgroundThreadService from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import TopicPartition from kafkatest.services.verifiable_client import VerifiableClientMixin -from kafkatest.version import DEV_BRANCH +from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_0_10_0_0 class ConsumerState: @@ -167,7 +167,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou def __init__(self, context, num_nodes, kafka, topic, group_id, static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False, - assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", + assignment_strategy=None, version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None, on_record_consumed=None, reset_policy="earliest", verify_offsets=True): """ @@ -175,7 +175,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou """ super(VerifiableConsumer, self).__init__(context, num_nodes) self.log_level = log_level - self.kafka = kafka self.topic = topic self.group_id = group_id @@ -225,7 +224,14 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou # apply group.instance.id to the node for static membership validation node.group_instance_id = None if self.static_membership: + assert node.version >= V_2_3_0, \ + "Version %s does not support static membership (must be 2.3 or higher)" % str(node.version) node.group_instance_id = self.group_id + "-instance-" + str(idx) + + if self.assignment_strategy: + assert node.version >= V_0_10_0_0, \ + "Version %s does not setting an assignment strategy (must be 0.10.0 or higher)" % str(node.version) + cmd = self.start_cmd(node) self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd)) @@ -292,9 +298,24 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou cmd += self.impl.exec_cmd(node) if self.on_record_consumed: cmd += " --verbose" - cmd += " --reset-policy %s --group-id %s --topic %s --group-instance-id %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \ - (self.reset_policy, self.group_id, self.topic, node.group_instance_id, self.kafka.bootstrap_servers(self.security_config.security_protocol), - self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "") + + if node.group_instance_id: + cmd += " --group-instance-id %s" % node.group_instance_id + elif node.version == V_2_3_0 or node.version == V_2_3_1: + # In 2.3, --group-instance-id was required, but would be left empty + # if `None` is passed as the argument value + cmd += " --group-instance-id None" + + if self.assignment_strategy: + cmd += " --assignment-strategy %s" % self.assignment_strategy + + if self.enable_autocommit: + cmd += " --enable-autocommit %s" % self.enable_autocommit + + cmd += " --reset-policy %s --group-id %s --topic %s --broker-list %s --session-timeout %s" % \ + (self.reset_policy, self.group_id, self.topic, + self.kafka.bootstrap_servers(self.security_config.security_protocol), + self.session_timeout_sec*1000) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py new file mode 100644 index 00000000000..0a7322d2f76 --- /dev/null +++ b/tests/kafkatest/tests/core/downgrade_test.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize +from ducktape.mark.resource import cluster + +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import config_property +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.tests.end_to_end import EndToEndTest +from kafkatest.utils import is_int +from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion + +class TestDowngrade(EndToEndTest): + + TOPIC_CONFIG = { + "partitions": 3, + "replication-factor": 3, + "configs": {"min.insync.replicas": 2} + } + + def __init__(self, test_context): + super(TestDowngrade, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG) + + def upgrade_from(self, kafka_version): + for node in self.kafka.nodes: + self.kafka.stop_node(node) + node.version = DEV_BRANCH + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(kafka_version) + node.config[config_property.MESSAGE_FORMAT_VERSION] = str(kafka_version) + self.kafka.start_node(node) + + def downgrade_to(self, kafka_version): + for node in self.kafka.nodes: + self.kafka.stop_node(node) + node.version = kafka_version + del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] + del node.config[config_property.MESSAGE_FORMAT_VERSION] + self.kafka.start_node(node) + + def setup_services(self, kafka_version, compression_types, security_protocol): + self.create_zookeeper() + self.zk.start() + + self.create_kafka(num_nodes=3, + security_protocol=security_protocol, + interbroker_security_protocol=security_protocol, + version=kafka_version) + self.kafka.start() + + self.create_producer(log_level="DEBUG", + compression_types=compression_types, + version=kafka_version) + self.producer.start() + + self.create_consumer(log_level="DEBUG", + version=kafka_version) + self.consumer.start() + + @cluster(num_nodes=7) + @parametrize(version=str(LATEST_2_3), compression_types=["none"]) + @parametrize(version=str(LATEST_2_3), compression_types=["zstd"], security_protocol="SASL_SSL") + @parametrize(version=str(LATEST_2_2), compression_types=["none"]) + @parametrize(version=str(LATEST_2_2), compression_types=["zstd"], security_protocol="SASL_SSL") + @parametrize(version=str(LATEST_2_1), compression_types=["none"]) + @parametrize(version=str(LATEST_2_1), compression_types=["lz4"], security_protocol="SASL_SSL") + @parametrize(version=str(LATEST_2_0), compression_types=["none"]) + @parametrize(version=str(LATEST_2_0), compression_types=["snappy"], security_protocol="SASL_SSL") + @parametrize(version=str(LATEST_1_1), compression_types=["none"]) + @parametrize(version=str(LATEST_1_1), compression_types=["lz4"], security_protocol="SASL_SSL") + def test_upgrade_and_downgrade(self, version, compression_types, security_protocol="PLAINTEXT"): + """Test upgrade and downgrade of Kafka cluster from old versions to the current version + + `version` is the Kafka version to upgrade from and downgrade back to + + Downgrades are supported to any version which is at or above the current + `inter.broker.protocol.version` (IBP). For example, if a user upgrades from 1.1 to 2.3, + but they leave the IBP set to 1.1, then downgrading to any version at 1.1 or higher is + supported. + + This test case verifies that producers and consumers continue working during + the course of an upgrade and downgrade. + + - Start 3 node broker cluster on version 'kafka_version' + - Start producer and consumer in the background + - Roll the cluster to upgrade to the current version with IBP set to 'kafka_version' + - Roll the cluster to downgrade back to 'kafka_version' + - Finally, validate that every message acked by the producer was consumed by the consumer + """ + kafka_version = KafkaVersion(version) + + self.setup_services(kafka_version, compression_types, security_protocol) + self.await_startup() + + self.logger.info("First pass bounce - rolling upgrade") + self.upgrade_from(kafka_version) + self.run_validation() + + self.logger.info("Second pass bounce - rolling downgrade") + self.downgrade_to(kafka_version) + self.run_validation() diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index 9cc6b41de2e..f385b996e41 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -90,7 +90,8 @@ class EndToEndTest(Test): for partition, offset in last_acked_offsets.iteritems(): if not partition in self.last_consumed_offsets: return False - if self.last_consumed_offsets[partition] < offset: + last_commit = self.consumer.last_commit(partition) + if not last_commit or last_commit < offset: return False return True diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 82a878a5515..0f2715a0528 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -529,7 +529,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons parser.addArgument("--group-instance-id") .action(store()) - .required(true) + .required(false) .type(String.class) .metavar("GROUP_INSTANCE_ID") .dest("groupInstanceId") @@ -613,7 +613,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); String groupInstanceId = res.getString("groupInstanceId"); - if (!groupInstanceId.equals("None")) { + if (groupInstanceId != null) { consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));