KAFKA-5487; upgrade and downgrade streams app system test

-Tests for rolling upgrades for a streams app (keeping broker config fixed)
-Tests for rolling upgrades of brokers (keeping streams app config fixed)

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3411 from enothereska/KAFKA-5487-upgrade-test-streams
This commit is contained in:
Eno Thereska 2017-06-24 07:48:10 +01:00 committed by Damian Guy
parent d5e463b9de
commit ee5eac715d
2 changed files with 169 additions and 0 deletions

View File

@ -0,0 +1,164 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.mark import parametrize, ignore
from kafkatest.services.kafka import KafkaService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, DEV_BRANCH, KafkaVersion
import time
class StreamsUpgradeTest(Test):
"""
Tests rolling upgrades and downgrades of the Kafka Streams library.
"""
def __init__(self, test_context):
super(StreamsUpgradeTest, self).__init__(test_context)
self.replication = 3
self.partitions = 1
self.isr = 2
self.topics = {
'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr}},
'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} }
}
def perform_streams_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling streams upgrade")
# get the node running the streams app
node = self.processor1.node
self.processor1.stop()
# change it's version. This will automatically make it pick up a different
# JAR when it starts again
node.version = KafkaVersion(to_version)
self.processor1.start()
def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
@cluster(num_nodes=6)
@parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH))
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
@parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2))
def test_upgrade_downgrade_streams(self, from_version, to_version):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
Ensure that all records are delivered.
Note, that just like tests/core/upgrade_test.py, a prerequisite for this test to succeed
if the inclusion of all parametrized versions of kafka in kafka/vagrant/base.sh
(search for get_kafka()). For streams in particular, that means that someone has manually
copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
"""
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
time.sleep(10)
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.processor1.start()
time.sleep(15)
self.perform_streams_upgrade(to_version)
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1.stop()
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=6)
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
def test_upgrade_brokers(self, from_version, to_version):
"""
Start a smoke test client then perform rolling upgrades on the broker.
"""
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
time.sleep(10)
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.processor1.start()
time.sleep(15)
self.perform_broker_upgrade(to_version)
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1.stop()
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)

View File

@ -64,15 +64,20 @@ get_kafka() {
kafka_dir=/opt/kafka-$version kafka_dir=/opt/kafka-$version
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz
# the .tgz above does not include the streams test jar hence we need to get it separately
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka-streams-$version-test.jar
if [ ! -d /opt/kafka-$version ]; then if [ ! -d /opt/kafka-$version ]; then
pushd /tmp pushd /tmp
curl -O $url curl -O $url
curl -O $url_streams_test || true
file_tgz=`basename $url` file_tgz=`basename $url`
file_streams_jar=`basename $url_streams_test` || true
tar -xzf $file_tgz tar -xzf $file_tgz
rm -rf $file_tgz rm -rf $file_tgz
file=`basename $file_tgz .tgz` file=`basename $file_tgz .tgz`
mv $file $kafka_dir mv $file $kafka_dir
mv $file_streams_jar $kafka_dir/libs || true
popd popd
fi fi
} }