diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 788d41bb4f1..33ece3502f8 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -242,7 +242,7 @@ class KafkaService(JmxMixin, Service): cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % { 'zk_connect': self.zk.connect_setting(), 'topic': topic_cfg.get("topic"), - 'partitions': topic_cfg.get('partitions', 1), + 'partitions': topic_cfg.get('partitions', 1), 'replication': topic_cfg.get('replication-factor', 1) } @@ -267,6 +267,15 @@ class KafkaService(JmxMixin, Service): for line in node.account.ssh_capture(cmd): output += line return output + + def alter_message_format(self, topic, msg_format_version, node=None): + if node is None: + node = self.nodes[0] + self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version) + cmd = "/opt/%s/bin/kafka-configs.sh --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \ + (kafka_dir(node), self.zk.connect_setting(), topic, msg_format_version) + self.logger.info("Running alter message format command...\n%s" % cmd) + node.account.ssh(cmd) def parse_describe_topic(self, topic_description): """Parse output of kafka-topics.sh --describe (or describe_topic() method above), which is a string of form @@ -508,4 +517,4 @@ class KafkaService(JmxMixin, Service): for line in node.account.ssh_capture(cmd): output += line self.logger.debug(output) - return output \ No newline at end of file + return output diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py index 761d91ba2b7..dc2582b6a62 100644 --- a/tests/kafkatest/services/kafka/version.py +++ b/tests/kafkatest/services/kafka/version.py @@ -63,3 +63,7 @@ LATEST_0_8_2 = V_0_8_2_2 V_0_9_0_0 = KafkaVersion("0.9.0.0") V_0_9_0_1 = KafkaVersion("0.9.0.1") LATEST_0_9 = V_0_9_0_1 + +# 0.10.0.X versions +V_0_10_0_0 = KafkaVersion("0.10.0.0") +LATEST_0_10 = V_0_10_0_0 diff --git a/tests/kafkatest/tests/message_format_change.py b/tests/kafkatest/tests/message_format_change.py new file mode 100644 index 00000000000..357fd17a9e3 --- /dev/null +++ b/tests/kafkatest/tests/message_format_change.py @@ -0,0 +1,92 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.mark import parametrize +from ducktape.utils.util import wait_until +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.utils import is_int +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.services.kafka import config_property +import time + + +class MessageFormatChangeTest(ProduceConsumeValidateTest): + + def __init__(self, test_context): + super(MessageFormatChangeTest, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + + self.zk.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + self.messages_per_producer = 100 + + def produce_and_consume(self, producer_version, consumer_version, group): + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, + throughput=self.producer_throughput, + message_validator=is_int, + version=KafkaVersion(producer_version)) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, consumer_timeout_ms=30000, + message_validator=is_int, version=KafkaVersion(consumer_version)) + self.consumer.group_id = group + self.run_produce_consume_validate(lambda: wait_until( + lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, + timeout_sec=120, backoff_sec=1, + err_msg="Producer did not produce all messages in reasonable amount of time")) + + @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK)) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9)) + def test_compatibility(self, producer_version, consumer_version): + """ This tests performs the following checks: + The workload is a mix of 0.9.x and 0.10.x producers and consumers + that produce to and consume from a 0.10.x cluster + 1. initially the topic is using message format 0.9.0 + 2. change the message format version for topic to 0.10.0 on the fly. + 3. change the message format version for topic back to 0.9.0 on the fly. + - The producers and consumers should not have any issue. + - Note that for 0.9.x consumers/producers we only do steps 1 and 2 + """ + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}}}) + + self.kafka.start() + self.logger.info("First format change to 0.9.0") + self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) + self.produce_and_consume(producer_version, consumer_version, "group1") + + self.logger.info("Second format change to 0.10.0") + self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) + self.produce_and_consume(producer_version, consumer_version, "group2") + + if producer_version == str(TRUNK) and consumer_version == str(TRUNK): + self.logger.info("Third format change back to 0.9.0") + self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) + self.produce_and_consume(producer_version, consumer_version, "group3") + +