KAFKA-2643: Run mirror maker ducktape tests with SSL and SASL

Run tests with SSL, SASL_PLAINTEXT and SASL_SSL. Same security protocol is used for source and target Kafka.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Geoff Andreson, Ben Stopford

Closes #559 from rajinisivaram/KAFKA-2643
This commit is contained in:
Rajini Sivaram 2015-11-25 15:05:31 -08:00 committed by Gwen Shapira
parent 34a6be2ccd
commit 69a1cced49
4 changed files with 41 additions and 14 deletions

View File

@ -71,6 +71,7 @@ class KafkaService(JmxMixin, Service):
self.interbroker_security_protocol = interbroker_security_protocol
self.sasl_mechanism = sasl_mechanism
self.topics = topics
self.minikdc = None
for node in self.nodes:
node.version = version
@ -82,10 +83,9 @@ class KafkaService(JmxMixin, Service):
def start(self):
if self.security_config.has_sasl_kerberos:
self.minikdc = MiniKdc(self.context, self.nodes)
self.minikdc.start()
else:
self.minikdc = None
if self.minikdc is None:
self.minikdc = MiniKdc(self.context, self.nodes)
self.minikdc.start()
Service.start(self)
# Create topics if necessary

View File

@ -18,6 +18,7 @@ from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.security.security_config import SecurityConfig
import os
import subprocess
@ -113,6 +114,7 @@ class MirrorMaker(Service):
def start_cmd(self, node):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node)
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
@ -147,16 +149,23 @@ class MirrorMaker(Service):
node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
self.security_config = self.source.security_config.client_config()
self.security_config.setup_node(node)
# Create, upload one consumer config file for source cluster
consumer_props = self.render("mirror_maker_consumer.properties")
consumer_props += str(self.security_config)
node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
self.logger.info("Mirrormaker consumer props:\n" + consumer_props)
# Create, upload producer properties file for target cluster
producer_props = self.render('mirror_maker_producer.properties')
producer_props += str(self.security_config)
self.logger.info("Mirrormaker producer props:\n" + producer_props)
node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props)
# Create and upload log properties
log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE)
node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
@ -180,3 +189,4 @@ class MirrorMaker(Service):
(self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

View File

@ -55,15 +55,15 @@ class VerifiableProducer(BackgroundThreadService):
node.version = version
self.acked_values = []
self.not_acked_values = []
self.prop_file = ""
self.security_config = kafka.security_config.client_config(self.prop_file)
self.prop_file += str(self.security_config)
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
self.security_config = self.kafka.security_config.client_config(self.prop_file)
self.prop_file += str(self.security_config)
log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)

View File

@ -21,6 +21,7 @@ from kafkatest.services.kafka import KafkaService
from kafkatest.services.console_consumer import ConsoleConsumer, is_int
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.mirror_maker import MirrorMaker
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
import time
@ -39,7 +40,6 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
throughput=1000)
@ -52,10 +52,21 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
def setUp(self):
# Source cluster
self.source_zk.start()
self.source_kafka.start()
# Target cluster
self.target_zk.start()
def start_kafka(self, security_protocol):
self.source_kafka.security_protocol = security_protocol
self.source_kafka.interbroker_security_protocol = security_protocol
self.target_kafka.security_protocol = security_protocol
self.target_kafka.interbroker_security_protocol = security_protocol
if self.source_kafka.security_config.has_sasl_kerberos:
minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
self.source_kafka.minikdc = minikdc
self.target_kafka.minikdc = minikdc
minikdc.start()
self.source_kafka.start()
self.target_kafka.start()
def bounce(self, clean_shutdown=True):
@ -98,9 +109,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
@parametrize(new_consumer=True)
@parametrize(new_consumer=False)
def test_simple_end_to_end(self, new_consumer):
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
def test_simple_end_to_end(self, security_protocol, new_consumer):
"""
Test end-to-end behavior under non-failure conditions.
@ -112,6 +123,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
- Consume messages from target.
- Verify that number of consumed messages matches the number produced.
"""
self.start_kafka(security_protocol)
self.consumer.new_consumer = new_consumer
self.mirror_maker.new_consumer = new_consumer
self.mirror_maker.start()
@ -126,8 +140,8 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
self.mirror_maker.stop()
@matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
@matrix(new_consumer=[True], clean_shutdown=[True, False])
def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True):
@matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
"""
Test end-to-end behavior under failure conditions.
@ -145,6 +159,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
# the group until the previous session times out
self.consumer.consumer_timeout_ms = 60000
self.start_kafka(security_protocol)
self.consumer.new_consumer = new_consumer
self.mirror_maker.offsets_storage = offsets_storage
self.mirror_maker.new_consumer = new_consumer
self.mirror_maker.start()