mirror of https://github.com/apache/kafka.git
MINOR: Compatibility and upgrade tests for 0.11.0.x
Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ewen Cheslack-Postava <me@ewencp.org> Closes #3454 from ijuma/test-upgrades-from-0.11.0.x
This commit is contained in:
parent
342f34a199
commit
49ed16daf4
|
@ -45,6 +45,7 @@ RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11-
|
||||||
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
|
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
|
||||||
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
|
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
|
||||||
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
|
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
|
||||||
|
RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
|
||||||
|
|
||||||
# Set up the ducker user.
|
# Set up the ducker user.
|
||||||
RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers
|
RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers
|
||||||
|
|
|
@ -23,11 +23,11 @@ from ducktape.tests.test import TestContext
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion
|
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, V_0_11_0_0, V_0_10_1_0, KafkaVersion
|
||||||
|
|
||||||
def get_broker_features(broker_version):
|
def get_broker_features(broker_version):
|
||||||
features = {}
|
features = {}
|
||||||
if (broker_version < V_0_10_1_0):
|
if broker_version < V_0_10_1_0:
|
||||||
features["create-topics-supported"] = False
|
features["create-topics-supported"] = False
|
||||||
features["offsets-for-times-supported"] = False
|
features["offsets-for-times-supported"] = False
|
||||||
features["cluster-id-supported"] = False
|
features["cluster-id-supported"] = False
|
||||||
|
@ -37,7 +37,7 @@ def get_broker_features(broker_version):
|
||||||
features["offsets-for-times-supported"] = True
|
features["offsets-for-times-supported"] = True
|
||||||
features["cluster-id-supported"] = True
|
features["cluster-id-supported"] = True
|
||||||
features["expect-record-too-large-exception"] = False
|
features["expect-record-too-large-exception"] = False
|
||||||
if (broker_version < V_0_11_0_0):
|
if broker_version < V_0_11_0_0:
|
||||||
features["describe-acls-supported"] = False
|
features["describe-acls-supported"] = False
|
||||||
else:
|
else:
|
||||||
features["describe-acls-supported"] = True
|
features["describe-acls-supported"] = True
|
||||||
|
@ -101,6 +101,7 @@ class ClientCompatibilityFeaturesTest(Test):
|
||||||
@parametrize(broker_version=str(LATEST_0_10_0))
|
@parametrize(broker_version=str(LATEST_0_10_0))
|
||||||
@parametrize(broker_version=str(LATEST_0_10_1))
|
@parametrize(broker_version=str(LATEST_0_10_1))
|
||||||
@parametrize(broker_version=str(LATEST_0_10_2))
|
@parametrize(broker_version=str(LATEST_0_10_2))
|
||||||
|
@parametrize(broker_version=str(LATEST_0_11_0))
|
||||||
def run_compatibility_test(self, broker_version):
|
def run_compatibility_test(self, broker_version):
|
||||||
self.zk.start()
|
self.zk.start()
|
||||||
self.kafka.set_version(KafkaVersion(broker_version))
|
self.kafka.set_version(KafkaVersion(broker_version))
|
||||||
|
|
|
@ -22,7 +22,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int_with_prefix
|
from kafkatest.utils import is_int_with_prefix
|
||||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, KafkaVersion
|
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, KafkaVersion
|
||||||
|
|
||||||
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
||||||
"""
|
"""
|
||||||
|
@ -56,6 +56,7 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
||||||
@parametrize(broker_version=str(LATEST_0_10_0))
|
@parametrize(broker_version=str(LATEST_0_10_0))
|
||||||
@parametrize(broker_version=str(LATEST_0_10_1))
|
@parametrize(broker_version=str(LATEST_0_10_1))
|
||||||
@parametrize(broker_version=str(LATEST_0_10_2))
|
@parametrize(broker_version=str(LATEST_0_10_2))
|
||||||
|
@parametrize(broker_version=str(LATEST_0_11_0))
|
||||||
def test_produce_consume(self, broker_version):
|
def test_produce_consume(self, broker_version):
|
||||||
print("running producer_consumer_compat with broker_version = %s" % broker_version)
|
print("running producer_consumer_compat with broker_version = %s" % broker_version)
|
||||||
self.kafka.set_version(KafkaVersion(broker_version))
|
self.kafka.set_version(KafkaVersion(broker_version))
|
||||||
|
|
|
@ -23,7 +23,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int
|
from kafkatest.utils import is_int
|
||||||
from kafkatest.version import LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, DEV_BRANCH, KafkaVersion
|
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion
|
||||||
|
|
||||||
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
|
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
|
||||||
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
||||||
|
@ -44,17 +44,18 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
||||||
self.messages_per_producer = 1000
|
self.messages_per_producer = 1000
|
||||||
|
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
|
||||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
|
||||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
|
||||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
|
|
||||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
|
|
||||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||||
|
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
|
||||||
|
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||||
|
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
|
||||||
|
@parametrize(producer_version=str(LATEST_0_11_0), consumer_version=str(LATEST_0_11_0), compression_types=["gzip"], timestamp_type=str("CreateTime"))
|
||||||
@parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime"))
|
@parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime"))
|
||||||
@parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
@parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||||
@parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
@parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||||
|
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||||
|
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
|
||||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
|
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||||
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
|
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
|
||||||
|
|
||||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
|
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
|
||||||
|
|
|
@ -25,7 +25,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int
|
from kafkatest.utils import is_int
|
||||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_BRANCH, KafkaVersion
|
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion
|
||||||
|
|
||||||
class TestUpgrade(ProduceConsumeValidateTest):
|
class TestUpgrade(ProduceConsumeValidateTest):
|
||||||
|
|
||||||
|
@ -62,6 +62,8 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
||||||
self.kafka.start_node(node)
|
self.kafka.start_node(node)
|
||||||
|
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["gzip"], new_consumer=False)
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["lz4"])
|
||||||
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
|
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
|
||||||
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"], new_consumer=False)
|
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"], new_consumer=False)
|
||||||
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"])
|
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"])
|
||||||
|
|
|
@ -63,9 +63,9 @@ get_kafka() {
|
||||||
scala_version=$2
|
scala_version=$2
|
||||||
|
|
||||||
kafka_dir=/opt/kafka-$version
|
kafka_dir=/opt/kafka-$version
|
||||||
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz
|
url=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz
|
||||||
# the .tgz above does not include the streams test jar hence we need to get it separately
|
# the .tgz above does not include the streams test jar hence we need to get it separately
|
||||||
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka-streams-$version-test.jar
|
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
|
||||||
if [ ! -d /opt/kafka-$version ]; then
|
if [ ! -d /opt/kafka-$version ]; then
|
||||||
pushd /tmp
|
pushd /tmp
|
||||||
curl -O $url
|
curl -O $url
|
||||||
|
@ -93,6 +93,8 @@ get_kafka 0.10.1.1 2.11
|
||||||
chmod a+rw /opt/kafka-0.10.1.1
|
chmod a+rw /opt/kafka-0.10.1.1
|
||||||
get_kafka 0.10.2.1 2.11
|
get_kafka 0.10.2.1 2.11
|
||||||
chmod a+rw /opt/kafka-0.10.2.1
|
chmod a+rw /opt/kafka-0.10.2.1
|
||||||
|
get_kafka 0.11.0.0 2.11
|
||||||
|
chmod a+rw /opt/kafka-0.11.0.0
|
||||||
|
|
||||||
|
|
||||||
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
|
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
|
||||||
|
|
Loading…
Reference in New Issue