KAFKA-3202: System test that changes message version on the fly

becketqin apovzner please have a look. becketqin the test fails when the producer and consumer are 0.9.x and the message format changes on the fly.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ewen Cheslack-Postava, Ismael Juma, Gwen Shapira

Closes #1070 from enothereska/kafka-3202-format-change-fly
This commit is contained in:
Eno Thereska 2016-03-17 15:37:37 -07:00 committed by Gwen Shapira
parent 579d473ce9
commit f57dabbe56
3 changed files with 107 additions and 2 deletions

View File

@ -268,6 +268,15 @@ class KafkaService(JmxMixin, Service):
output += line
return output
def alter_message_format(self, topic, msg_format_version, node=None):
if node is None:
node = self.nodes[0]
self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
cmd = "/opt/%s/bin/kafka-configs.sh --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
(kafka_dir(node), self.zk.connect_setting(), topic, msg_format_version)
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)
def parse_describe_topic(self, topic_description):
"""Parse output of kafka-topics.sh --describe (or describe_topic() method above), which is a string of form
PartitionCount:2\tReplicationFactor:2\tConfigs:

View File

@ -63,3 +63,7 @@ LATEST_0_8_2 = V_0_8_2_2
V_0_9_0_0 = KafkaVersion("0.9.0.0")
V_0_9_0_1 = KafkaVersion("0.9.0.1")
LATEST_0_9 = V_0_9_0_1
# 0.10.0.X versions
V_0_10_0_0 = KafkaVersion("0.10.0.0")
LATEST_0_10 = V_0_10_0_0

View File

@ -0,0 +1,92 @@
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils import is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.services.kafka import config_property
import time
class MessageFormatChangeTest(ProduceConsumeValidateTest):
def __init__(self, test_context):
super(MessageFormatChangeTest, self).__init__(test_context=test_context)
def setUp(self):
self.topic = "test_topic"
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# Producer and consumer
self.producer_throughput = 10000
self.num_producers = 1
self.num_consumers = 1
self.messages_per_producer = 100
def produce_and_consume(self, producer_version, consumer_version, group):
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic,
throughput=self.producer_throughput,
message_validator=is_int,
version=KafkaVersion(producer_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, consumer_timeout_ms=30000,
message_validator=is_int, version=KafkaVersion(consumer_version))
self.consumer.group_id = group
self.run_produce_consume_validate(lambda: wait_until(
lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
timeout_sec=120, backoff_sec=1,
err_msg="Producer did not produce all messages in reasonable amount of time"))
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
def test_compatibility(self, producer_version, consumer_version):
""" This tests performs the following checks:
The workload is a mix of 0.9.x and 0.10.x producers and consumers
that produce to and consume from a 0.10.x cluster
1. initially the topic is using message format 0.9.0
2. change the message format version for topic to 0.10.0 on the fly.
3. change the message format version for topic back to 0.9.0 on the fly.
- The producers and consumers should not have any issue.
- Note that for 0.9.x consumers/producers we only do steps 1 and 2
"""
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}})
self.kafka.start()
self.logger.info("First format change to 0.9.0")
self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
self.produce_and_consume(producer_version, consumer_version, "group1")
self.logger.info("Second format change to 0.10.0")
self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
self.produce_and_consume(producer_version, consumer_version, "group2")
if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
self.logger.info("Third format change back to 0.9.0")
self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
self.produce_and_consume(producer_version, consumer_version, "group3")