KAFKA-8509; Add downgrade system test (#7724)

This patch adds a basic downgrade system test. It verifies that producing and consuming continues to work before and after the downgrade.

Reviewers: Ismael Juma <ismael@juma.me.uk>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Jason Gustafson 2019-11-22 10:09:13 -08:00 committed by GitHub
parent 5d0c2f3b2a
commit 9d8ab3a1a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 146 additions and 9 deletions

View File

@ -21,7 +21,7 @@ from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.version import DEV_BRANCH
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_0_10_0_0
class ConsumerState:
@ -167,7 +167,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def __init__(self, context, num_nodes, kafka, topic, group_id,
static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
assignment_strategy=None,
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
"""
@ -175,7 +175,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
"""
super(VerifiableConsumer, self).__init__(context, num_nodes)
self.log_level = log_level
self.kafka = kafka
self.topic = topic
self.group_id = group_id
@ -225,7 +224,14 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
# apply group.instance.id to the node for static membership validation
node.group_instance_id = None
if self.static_membership:
assert node.version >= V_2_3_0, \
"Version %s does not support static membership (must be 2.3 or higher)" % str(node.version)
node.group_instance_id = self.group_id + "-instance-" + str(idx)
if self.assignment_strategy:
assert node.version >= V_0_10_0_0, \
"Version %s does not setting an assignment strategy (must be 0.10.0 or higher)" % str(node.version)
cmd = self.start_cmd(node)
self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
@ -292,9 +298,24 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
cmd += self.impl.exec_cmd(node)
if self.on_record_consumed:
cmd += " --verbose"
cmd += " --reset-policy %s --group-id %s --topic %s --group-instance-id %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
(self.reset_policy, self.group_id, self.topic, node.group_instance_id, self.kafka.bootstrap_servers(self.security_config.security_protocol),
self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
if node.group_instance_id:
cmd += " --group-instance-id %s" % node.group_instance_id
elif node.version == V_2_3_0 or node.version == V_2_3_1:
# In 2.3, --group-instance-id was required, but would be left empty
# if `None` is passed as the argument value
cmd += " --group-instance-id None"
if self.assignment_strategy:
cmd += " --assignment-strategy %s" % self.assignment_strategy
if self.enable_autocommit:
cmd += " --enable-autocommit %s" % self.enable_autocommit
cmd += " --reset-policy %s --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
(self.reset_policy, self.group_id, self.topic,
self.kafka.bootstrap_servers(self.security_config.security_protocol),
self.session_timeout_sec*1000)
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)

View File

@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import config_property
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.end_to_end import EndToEndTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
class TestDowngrade(EndToEndTest):
TOPIC_CONFIG = {
"partitions": 3,
"replication-factor": 3,
"configs": {"min.insync.replicas": 2}
}
def __init__(self, test_context):
super(TestDowngrade, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG)
def upgrade_from(self, kafka_version):
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = DEV_BRANCH
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(kafka_version)
node.config[config_property.MESSAGE_FORMAT_VERSION] = str(kafka_version)
self.kafka.start_node(node)
def downgrade_to(self, kafka_version):
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = kafka_version
del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
del node.config[config_property.MESSAGE_FORMAT_VERSION]
self.kafka.start_node(node)
def setup_services(self, kafka_version, compression_types, security_protocol):
self.create_zookeeper()
self.zk.start()
self.create_kafka(num_nodes=3,
security_protocol=security_protocol,
interbroker_security_protocol=security_protocol,
version=kafka_version)
self.kafka.start()
self.create_producer(log_level="DEBUG",
compression_types=compression_types,
version=kafka_version)
self.producer.start()
self.create_consumer(log_level="DEBUG",
version=kafka_version)
self.consumer.start()
@cluster(num_nodes=7)
@parametrize(version=str(LATEST_2_3), compression_types=["none"])
@parametrize(version=str(LATEST_2_3), compression_types=["zstd"], security_protocol="SASL_SSL")
@parametrize(version=str(LATEST_2_2), compression_types=["none"])
@parametrize(version=str(LATEST_2_2), compression_types=["zstd"], security_protocol="SASL_SSL")
@parametrize(version=str(LATEST_2_1), compression_types=["none"])
@parametrize(version=str(LATEST_2_1), compression_types=["lz4"], security_protocol="SASL_SSL")
@parametrize(version=str(LATEST_2_0), compression_types=["none"])
@parametrize(version=str(LATEST_2_0), compression_types=["snappy"], security_protocol="SASL_SSL")
@parametrize(version=str(LATEST_1_1), compression_types=["none"])
@parametrize(version=str(LATEST_1_1), compression_types=["lz4"], security_protocol="SASL_SSL")
def test_upgrade_and_downgrade(self, version, compression_types, security_protocol="PLAINTEXT"):
"""Test upgrade and downgrade of Kafka cluster from old versions to the current version
`version` is the Kafka version to upgrade from and downgrade back to
Downgrades are supported to any version which is at or above the current
`inter.broker.protocol.version` (IBP). For example, if a user upgrades from 1.1 to 2.3,
but they leave the IBP set to 1.1, then downgrading to any version at 1.1 or higher is
supported.
This test case verifies that producers and consumers continue working during
the course of an upgrade and downgrade.
- Start 3 node broker cluster on version 'kafka_version'
- Start producer and consumer in the background
- Roll the cluster to upgrade to the current version with IBP set to 'kafka_version'
- Roll the cluster to downgrade back to 'kafka_version'
- Finally, validate that every message acked by the producer was consumed by the consumer
"""
kafka_version = KafkaVersion(version)
self.setup_services(kafka_version, compression_types, security_protocol)
self.await_startup()
self.logger.info("First pass bounce - rolling upgrade")
self.upgrade_from(kafka_version)
self.run_validation()
self.logger.info("Second pass bounce - rolling downgrade")
self.downgrade_to(kafka_version)
self.run_validation()

View File

@ -90,7 +90,8 @@ class EndToEndTest(Test):
for partition, offset in last_acked_offsets.iteritems():
if not partition in self.last_consumed_offsets:
return False
if self.last_consumed_offsets[partition] < offset:
last_commit = self.consumer.last_commit(partition)
if not last_commit or last_commit < offset:
return False
return True

View File

@ -529,7 +529,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
parser.addArgument("--group-instance-id")
.action(store())
.required(true)
.required(false)
.type(String.class)
.metavar("GROUP_INSTANCE_ID")
.dest("groupInstanceId")
@ -613,7 +613,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
String groupInstanceId = res.getString("groupInstanceId");
if (!groupInstanceId.equals("None")) {
if (groupInstanceId != null) {
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));