KAFKA-2642; Run replication tests with SSL and SASL clients

For SSL and SASL replication tests, set security protocol for clients as well.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ben Stopford <benstopford@gmail.com>, Geoff Anderson <geoff@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #563 from rajinisivaram/KAFKA-2642
This commit is contained in:
Rajini Sivaram 2015-12-02 08:08:37 -06:00 committed by Jun Rao
parent 69269e76a4
commit cff03f8b68
5 changed files with 9 additions and 8 deletions

View File

@ -32,7 +32,7 @@ class ClientCompatibilityTest(Test):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}})
'configs': {"min.insync.replicas": 2}}})
self.zk.start()
self.kafka.start()

View File

@ -50,7 +50,7 @@ class QuotaTest(Test):
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
security_protocol='PLAINTEXT',
interbroker_security_protocol='PLAINTEXT',
topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}},
topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
quota_config=self.quota_config,
jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],

View File

@ -93,9 +93,9 @@ class ReplicationTest(ProduceConsumeValidateTest):
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}
'configs': {"min.insync.replicas": 2}}
})
self.producer_throughput = 10000
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
@ -123,10 +123,11 @@ class ReplicationTest(ProduceConsumeValidateTest):
- Validate that every acked message was consumed
"""
self.kafka.security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start()
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))

View File

@ -40,7 +40,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}})
'configs': {"min.insync.replicas": 2}}})
self.zk.start()
#reduce replica.lag.time.max.ms due to KAFKA-2827

View File

@ -33,7 +33,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}})
'configs': {"min.insync.replicas": 2}}})
self.zk.start()
self.kafka.start()