KAFKA-2644; Run relevant ducktape tests with SASL_PLAINTEXT and SASL_SSL

Run sanity check, replication tests and benchmarks with SASL/Kerberos using MiniKdc.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Geoff Anderson <geoff@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #358 from rajinisivaram/KAFKA-2644
This commit is contained in:
Rajini Sivaram 2015-11-03 21:25:15 -08:00 committed by Jun Rao
parent 596c203af1
commit 98db5ea94f
19 changed files with 343 additions and 114 deletions

View File

@ -323,7 +323,7 @@ project(':core') {
} }
jar { jar {
dependsOn 'copyDependantLibs' dependsOn('copyDependantLibs', 'copyDependantTestLibs')
} }
jar.manifest { jar.manifest {
@ -347,6 +347,13 @@ project(':core') {
artifacts { artifacts {
archives testJar archives testJar
} }
tasks.create(name: "copyDependantTestLibs", type: Copy) {
from (configurations.testRuntime) {
include('*.jar')
}
into "$buildDir/dependant-testlibs"
}
} }
project(':contrib:hadoop-consumer') { project(':contrib:hadoop-consumer') {

View File

@ -24,7 +24,7 @@ from kafkatest.services.kafka.version import LATEST_0_8_2
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils.remote_account import line_count, file_exists from kafkatest.utils.remote_account import line_count, file_exists
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
import time import time
@ -44,9 +44,9 @@ class ConsoleConsumerTest(Test):
def setUp(self): def setUp(self):
self.zk.start() self.zk.start()
@parametrize(security_protocol=SecurityConfig.SSL, new_consumer=True) @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@matrix(security_protocol=[SecurityConfig.PLAINTEXT], new_consumer=[False, True]) @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
def test_lifecycle(self, security_protocol, new_consumer): def test_lifecycle(self, security_protocol, new_consumer=True):
"""Check that console consumer starts/stops properly, and that we are capturing log output.""" """Check that console consumer starts/stops properly, and that we are capturing log output."""
self.kafka.security_protocol = security_protocol self.kafka.security_protocol = security_protocol

View File

@ -19,7 +19,7 @@ from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2 from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
import itertools import itertools
import os import os
@ -99,7 +99,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, new_consumer=False, message_validator=None, def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None,
from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]): from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
""" """
Args: Args:
@ -107,7 +107,6 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
num_nodes: number of nodes to use (this should be 1) num_nodes: number of nodes to use (this should be 1)
kafka: kafka service kafka: kafka service
topic: consume from this topic topic: consume from this topic
security_protocol: security protocol for Kafka connections
new_consumer: use new Kafka consumer if True new_consumer: use new Kafka consumer if True
message_validator: function which returns message or None message_validator: function which returns message or None
from_beginning: consume from beginning if True, else from the end from_beginning: consume from beginning if True, else from the end
@ -132,13 +131,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
self.message_validator = message_validator self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.client_id = client_id self.client_id = client_id
self.security_protocol = security_protocol
# Validate a few configs
if self.new_consumer is None:
self.new_consumer = self.security_protocol == SecurityConfig.SSL
if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
raise Exception("SSL protocol is supported only with the new consumer")
def prop_file(self, node): def prop_file(self, node):
"""Return a string which can be used to create a configuration file appropriate for the given node.""" """Return a string which can be used to create a configuration file appropriate for the given node."""
@ -151,8 +144,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
# Add security properties to the config. If security protocol is not specified, # Add security properties to the config. If security protocol is not specified,
# use the default in the template properties. # use the default in the template properties.
self.security_config = SecurityConfig(self.security_protocol, prop_file) self.security_config = self.kafka.security_config.client_config(prop_file)
self.security_protocol = self.security_config.security_protocol
prop_file += str(self.security_config) prop_file += str(self.security_config)
return prop_file return prop_file
@ -170,10 +162,12 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
args['jmx_port'] = self.jmx_port args['jmx_port'] = self.jmx_port
args['kafka_dir'] = kafka_dir(node) args['kafka_dir'] = kafka_dir(node)
args['broker_list'] = self.kafka.bootstrap_servers() args['broker_list'] = self.kafka.bootstrap_servers()
args['kafka_opts'] = self.security_config.kafka_opts
cmd = "export JMX_PORT=%(jmx_port)s; " \ cmd = "export JMX_PORT=%(jmx_port)s; " \
"export LOG_DIR=%(log_dir)s; " \ "export LOG_DIR=%(log_dir)s; " \
"export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \ "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
"export KAFKA_OPTS=%(kafka_opts)s; " \
"/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \ "/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \
"--topic %(topic)s --consumer.config %(config_file)s" % args "--topic %(topic)s --consumer.config %(config_file)s" % args

View File

@ -22,7 +22,8 @@ from kafkatest.services.kafka.version import TRUNK
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.security.minikdc import MiniKdc
import json import json
import re import re
import signal import signal
@ -45,7 +46,7 @@ class KafkaService(JmxMixin, Service):
} }
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]): sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
""" """
:type context :type context
:type zk: ZookeeperService :type zk: ZookeeperService
@ -59,6 +60,7 @@ class KafkaService(JmxMixin, Service):
self.security_protocol = security_protocol self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol self.interbroker_security_protocol = interbroker_security_protocol
self.sasl_mechanism = sasl_mechanism
self.topics = topics self.topics = topics
for node in self.nodes: for node in self.nodes:
@ -67,16 +69,15 @@ class KafkaService(JmxMixin, Service):
@property @property
def security_config(self): def security_config(self):
if self.security_protocol == SecurityConfig.SSL or self.interbroker_security_protocol == SecurityConfig.SSL: return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, sasl_mechanism=self.sasl_mechanism)
return SecurityConfig(SecurityConfig.SSL)
else:
return SecurityConfig(SecurityConfig.PLAINTEXT)
@property
def port(self):
return 9092 if self.security_protocol == SecurityConfig.PLAINTEXT else 9093
def start(self): def start(self):
if self.security_config.has_sasl_kerberos:
self.minikdc = MiniKdc(self.context, self.nodes)
self.minikdc.start()
else:
self.minikdc = None
Service.start(self) Service.start(self)
# Create topics if necessary # Create topics if necessary
@ -96,12 +97,15 @@ class KafkaService(JmxMixin, Service):
# TODO - clean up duplicate configuration logic # TODO - clean up duplicate configuration logic
prop_file = cfg.render() prop_file = cfg.render()
prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node), prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config, port=self.port) security_config=self.security_config,
interbroker_security_protocol=self.interbroker_security_protocol,
sasl_mechanism=self.sasl_mechanism)
return prop_file return prop_file
def start_cmd(self, node): def start_cmd(self, node):
cmd = "export JMX_PORT=%d; " % self.jmx_port cmd = "export JMX_PORT=%d; " % self.jmx_port
cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; " cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &" cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
return cmd return cmd
@ -296,8 +300,7 @@ class KafkaService(JmxMixin, Service):
def bootstrap_servers(self): def bootstrap_servers(self):
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
using the port for the configured security protocol.
This is the format expected by many config files. This is the format expected by many config files.
""" """
return ','.join([node.account.hostname + ":" + str(self.port) for node in self.nodes]) return ','.join([node.account.hostname + ":9092" for node in self.nodes])

View File

@ -18,11 +18,11 @@
advertised.host.name={{ node.account.hostname }} advertised.host.name={{ node.account.hostname }}
{% if security_protocol == interbroker_security_protocol %} {% if security_protocol == interbroker_security_protocol %}
listeners={{ security_protocol }}://:{{ port }} listeners={{ security_protocol }}://:9092
advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }} advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092
{% else %} {% else %}
listeners=PLAINTEXT://:9092,SSL://:9093 listeners={{ security_protocol }}://:9092,{{ interbroker_security_protocol }}://:9093
advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093 advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092,{{ interbroker_security_protocol }}://{{ node.account.hostname }}:9093
{% endif %} {% endif %}
num.network.threads=3 num.network.threads=3
@ -56,10 +56,12 @@ quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_p
security.inter.broker.protocol={{ interbroker_security_protocol }} security.inter.broker.protocol={{ interbroker_security_protocol }}
ssl.keystore.location=/mnt/ssl/test.keystore.jks ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.keystore.password=test-ks-passwd ssl.keystore.password=test-ks-passwd
ssl.key.password=test-key-passwd ssl.key.password=test-key-passwd
ssl.keystore.type=JKS ssl.keystore.type=JKS
ssl.truststore.location=/mnt/ssl/test.truststore.jks ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS ssl.truststore.type=JKS
sasl.mechanism={{ sasl_mechanism }}
sasl.kerberos.service.name=kafka

View File

@ -16,7 +16,7 @@
from ducktape.services.background_thread import BackgroundThreadService from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
class KafkaLog4jAppender(BackgroundThreadService): class KafkaLog4jAppender(BackgroundThreadService):

View File

@ -15,7 +15,7 @@
from kafkatest.services.performance import PerformanceService from kafkatest.services.performance import PerformanceService
from kafkatest.services.kafka.directory import kafka_dir from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
import os import os
@ -69,11 +69,10 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}): def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes) super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka self.kafka = kafka
self.security_config = SecurityConfig(security_protocol) self.security_config = kafka.security_config.client_config()
self.security_protocol = security_protocol
self.topic = topic self.topic = topic
self.messages = messages self.messages = messages
self.new_consumer = new_consumer self.new_consumer = new_consumer
@ -123,6 +122,7 @@ class ConsumerPerformanceService(PerformanceService):
def start_cmd(self, node): def start_cmd(self, node):
cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node) cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
for key, value in self.args.items(): for key, value in self.args.items():

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from kafkatest.services.performance import PerformanceService from kafkatest.services.performance import PerformanceService
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka.directory import kafka_dir from kafkatest.services.kafka.directory import kafka_dir
@ -27,34 +27,35 @@ class EndToEndLatencyService(PerformanceService):
"collect_default": True}, "collect_default": True},
} }
def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1): def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes) super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka self.kafka = kafka
self.security_config = SecurityConfig(security_protocol) self.security_config = kafka.security_config.client_config()
self.security_protocol = security_protocol
self.args = { self.args = {
'topic': topic, 'topic': topic,
'num_records': num_records, 'num_records': num_records,
'consumer_fetch_max_wait': consumer_fetch_max_wait, 'consumer_fetch_max_wait': consumer_fetch_max_wait,
'acks': acks 'acks': acks,
'kafka_opts': self.security_config.kafka_opts
} }
def _worker(self, idx, node): def _worker(self, idx, node):
args = self.args.copy() args = self.args.copy()
self.security_config.setup_node(node) self.security_config.setup_node(node)
if self.security_protocol == SecurityConfig.SSL: if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
ssl_config_file = SecurityConfig.SSL_DIR + "/security.properties" security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties"
node.account.create_file(ssl_config_file, str(self.security_config)) node.account.create_file(security_config_file, str(self.security_config))
else: else:
ssl_config_file = "" security_config_file = ""
args.update({ args.update({
'zk_connect': self.kafka.zk.connect_setting(), 'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(), 'bootstrap_servers': self.kafka.bootstrap_servers(),
'ssl_config_file': ssl_config_file 'security_config_file': security_config_file,
'kafka_dir': kafka_dir(node)
}) })
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % kafka_dir(node) cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(ssl_config_file)s" % args cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(security_config_file)s" % args
cmd += " | tee /mnt/end-to-end-latency.log" cmd += " | tee /mnt/end-to-end-latency.log"
self.logger.debug("End-to-end latency %d command: %s", idx, cmd) self.logger.debug("End-to-end latency %d command: %s", idx, cmd)

View File

@ -16,7 +16,7 @@
from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.performance import PerformanceService from kafkatest.services.performance import PerformanceService
import itertools import itertools
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka.directory import kafka_dir from kafkatest.services.kafka.directory import kafka_dir
class ProducerPerformanceService(JmxMixin, PerformanceService): class ProducerPerformanceService(JmxMixin, PerformanceService):
@ -27,15 +27,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
"collect_default": True}, "collect_default": True},
} }
def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={},
intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]): intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
PerformanceService.__init__(self, context, num_nodes) PerformanceService.__init__(self, context, num_nodes)
self.kafka = kafka self.kafka = kafka
self.security_config = SecurityConfig(security_protocol) self.security_config = kafka.security_config.client_config()
self.security_protocol = security_protocol
self.args = { self.args = {
'topic': topic, 'topic': topic,
'kafka_opts': self.security_config.kafka_opts,
'num_records': num_records, 'num_records': num_records,
'record_size': record_size, 'record_size': record_size,
'throughput': throughput 'throughput': throughput
@ -52,11 +52,11 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
'client_id': self.client_id, 'client_id': self.client_id,
'kafka_directory': kafka_dir(node) 'kafka_directory': kafka_dir(node)
}) })
cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ cmd = "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
"--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
self.security_config.setup_node(node) self.security_config.setup_node(node)
if self.security_protocol == SecurityConfig.SSL: if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
self.settings.update(self.security_config.properties) self.settings.update(self.security_config.properties)
for key, value in self.settings.items(): for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value)) cmd += " %s=%s" % (str(key), str(value))

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from kafkatest.services.kafka.directory import kafka_dir
import os
class MiniKdc(Service):
logs = {
"minikdc_log": {
"path": "/mnt/minikdc/minikdc.log",
"collect_default": True}
}
WORK_DIR = "/mnt/minikdc"
PROPS_FILE = "/mnt/minikdc/minikdc.properties"
KEYTAB_FILE = "/mnt/minikdc/keytab"
KRB5CONF_FILE = "/mnt/minikdc/krb5.conf"
LOG_FILE = "/mnt/minikdc/minikdc.log"
LOCAL_KEYTAB_FILE = "/tmp/keytab"
LOCAL_KRB5CONF_FILE = "/tmp/krb5.conf"
def __init__(self, context, kafka_nodes):
super(MiniKdc, self).__init__(context, 1)
self.kafka_nodes = kafka_nodes
def start_node(self, node):
node.account.ssh("mkdir -p %s" % MiniKdc.WORK_DIR, allow_fail=False)
props_file = self.render('minikdc.properties', node=node)
node.account.create_file(MiniKdc.PROPS_FILE, props_file)
self.logger.info("minikdc.properties")
self.logger.info(props_file)
kafka_principals = ' '.join(['kafka/' + kafka_node.account.hostname for kafka_node in self.kafka_nodes])
principals = 'client ' + kafka_principals
self.logger.info("Starting MiniKdc with principals " + principals)
lib_dir = "/opt/%s/core/build/dependant-testlibs" % kafka_dir(node)
kdc_jars = node.account.ssh_capture("ls " + lib_dir)
classpath = ":".join([os.path.join(lib_dir, jar.strip()) for jar in kdc_jars])
cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh org.apache.hadoop.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
self.logger.debug("Attempting to start MiniKdc on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log(MiniKdc.LOG_FILE) as monitor:
node.account.ssh(cmd)
monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup")
node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
def stop_node(self, node):
self.logger.info("Stopping %s on %s" % (type(self).__name__, node.account.hostname))
node.account.kill_process("apacheds", allow_fail=False)
def clean_node(self, node):
node.account.kill_process("apacheds", clean_shutdown=False, allow_fail=False)
node.account.ssh("rm -rf " + MiniKdc.WORK_DIR, allow_fail=False)
if os.path.exists(MiniKdc.LOCAL_KEYTAB_FILE):
os.remove(MiniKdc.LOCAL_KEYTAB_FILE)
if os.path.exists(MiniKdc.LOCAL_KRB5CONF_FILE):
os.remove(MiniKdc.LOCAL_KRB5CONF_FILE)

View File

@ -15,7 +15,8 @@
import os import os
import subprocess import subprocess
from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
class Keytool(object): class Keytool(object):
@ -56,17 +57,24 @@ class Keytool(object):
raise subprocess.CalledProcessError(proc.returncode, cmd) raise subprocess.CalledProcessError(proc.returncode, cmd)
class SecurityConfig(object): class SecurityConfig(TemplateRenderer):
PLAINTEXT = 'PLAINTEXT' PLAINTEXT = 'PLAINTEXT'
SSL = 'SSL' SSL = 'SSL'
SSL_DIR = "/mnt/ssl" SASL_PLAINTEXT = 'SASL_PLAINTEXT'
KEYSTORE_PATH = "/mnt/ssl/test.keystore.jks" SASL_SSL = 'SASL_SSL'
TRUSTSTORE_PATH = "/mnt/ssl/test.truststore.jks" SASL_MECHANISM_GSSAPI = 'GSSAPI'
SASL_MECHANISM_PLAIN = 'PLAIN'
CONFIG_DIR = "/mnt/security"
KEYSTORE_PATH = "/mnt/security/test.keystore.jks"
TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks"
JAAS_CONF_PATH = "/mnt/security/jaas.conf"
KRB5CONF_PATH = "/mnt/security/krb5.conf"
KEYTAB_PATH = "/mnt/security/keytab"
ssl_stores = Keytool.generate_keystore_truststore('.') ssl_stores = Keytool.generate_keystore_truststore('.')
def __init__(self, security_protocol, template_props=""): def __init__(self, security_protocol, interbroker_security_protocol=None, sasl_mechanism=SASL_MECHANISM_GSSAPI, template_props=""):
""" """
Initialize the security properties for the node and copy Initialize the security properties for the node and copy
keystore and truststore to the remote node if the transport protocol keystore and truststore to the remote node if the transport protocol
@ -79,27 +87,52 @@ class SecurityConfig(object):
security_protocol = self.get_property('security.protocol', template_props) security_protocol = self.get_property('security.protocol', template_props)
if security_protocol is None: if security_protocol is None:
security_protocol = SecurityConfig.PLAINTEXT security_protocol = SecurityConfig.PLAINTEXT
elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]: elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]:
raise Exception("Invalid security.protocol in template properties: " + security_protocol) raise Exception("Invalid security.protocol in template properties: " + security_protocol)
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol)
self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
self.properties = { self.properties = {
'security.protocol' : security_protocol, 'security.protocol' : security_protocol,
'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'], 'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'], 'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH, 'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'] 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
'sasl.mechanism' : sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
} }
def client_config(self, template_props=""):
return SecurityConfig(self.security_protocol, sasl_mechanism=self.sasl_mechanism, template_props=template_props)
def setup_node(self, node): def setup_node(self, node):
if self.security_protocol == SecurityConfig.SSL: if self.has_ssl:
node.account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False) node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH) node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH) node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
if self.has_sasl:
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
jaas_conf_file = self.sasl_mechanism.lower() + "_jaas.conf"
java_version = node.account.ssh_capture("java -version")
if any('IBM' in line for line in java_version):
is_ibm_jdk = True
else:
is_ibm_jdk = False
jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk)
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
if self.has_sasl_kerberos:
node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
node.account.scp_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
def clean_node(self, node): def clean_node(self, node):
if self.security_protocol == SecurityConfig.SSL: if self.security_protocol != SecurityConfig.PLAINTEXT:
node.account.ssh("rm -rf %s" % SecurityConfig.SSL_DIR, allow_fail=False) node.account.ssh("rm -rf %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
def get_property(self, prop_name, template_props=""): def get_property(self, prop_name, template_props=""):
""" """
@ -113,10 +146,31 @@ class SecurityConfig(object):
value = str(items[1].strip()) value = str(items[1].strip())
return value return value
def is_ssl(self, security_protocol):
return security_protocol == SecurityConfig.SSL or security_protocol == SecurityConfig.SASL_SSL
def is_sasl(self, security_protocol):
return security_protocol == SecurityConfig.SASL_PLAINTEXT or security_protocol == SecurityConfig.SASL_SSL
@property @property
def security_protocol(self): def security_protocol(self):
return self.properties['security.protocol'] return self.properties['security.protocol']
@property
def sasl_mechanism(self):
return self.properties['sasl.mechanism']
@property
def has_sasl_kerberos(self):
return self.has_sasl and self.sasl_mechanism == SecurityConfig.SASL_MECHANISM_GSSAPI
@property
def kafka_opts(self):
if self.has_sasl:
return "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, SecurityConfig.KRB5CONF_PATH)
else:
return ""
def __str__(self): def __str__(self):
""" """
Return properties as string with line separators. Return properties as string with line separators.
@ -125,7 +179,7 @@ class SecurityConfig(object):
""" """
prop_str = "" prop_str = ""
if self.security_protocol == SecurityConfig.SSL: if self.security_protocol != SecurityConfig.PLAINTEXT:
for key, value in self.properties.items(): for key, value in self.properties.items():
prop_str += ("\n" + key + "=" + value) prop_str += ("\n" + key + "=" + value)
prop_str += "\n" prop_str += "\n"

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
{% if is_ibm_jdk %}
KafkaClient {
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="client@EXAMPLE.COM";
};
KafkaServer {
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% else %}
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="client@EXAMPLE.COM";
};
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% endif %}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
kdc.bind.address={{ node.account.hostname }}

View File

@ -17,7 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2 from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
import json import json
import os import os
@ -46,7 +46,7 @@ class VerifiableProducer(BackgroundThreadService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, max_messages=-1, throughput=100000, version=TRUNK): def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, version=TRUNK):
super(VerifiableProducer, self).__init__(context, num_nodes) super(VerifiableProducer, self).__init__(context, num_nodes)
self.log_level = "TRACE" self.log_level = "TRACE"
@ -61,8 +61,7 @@ class VerifiableProducer(BackgroundThreadService):
self.not_acked_values = [] self.not_acked_values = []
self.prop_file = "" self.prop_file = ""
self.security_config = SecurityConfig(security_protocol, self.prop_file) self.security_config = kafka.security_config.client_config(self.prop_file)
self.security_protocol = self.security_config.security_protocol
self.prop_file += str(self.security_config) self.prop_file += str(self.security_config)
def _worker(self, idx, node): def _worker(self, idx, node):
@ -120,6 +119,7 @@ class VerifiableProducer(BackgroundThreadService):
cmd += "export CLASSPATH; " cmd += "export CLASSPATH; "
cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \ cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())

View File

@ -80,7 +80,7 @@ class Benchmark(Test):
nrecords = int(self.target_data_size / message_size) nrecords = int(self.target_data_size / message_size)
self.producer = ProducerPerformanceService( self.producer = ProducerPerformanceService(
self.test_context, num_producers, self.kafka, security_protocol=security_protocol, topic=topic, self.test_context, num_producers, self.kafka, topic=topic,
num_records=nrecords, record_size=message_size, throughput=-1, num_records=nrecords, record_size=message_size, throughput=-1,
settings={ settings={
'acks': acks, 'acks': acks,
@ -89,9 +89,9 @@ class Benchmark(Test):
self.producer.run() self.producer.run()
return compute_aggregate_throughput(self.producer) return compute_aggregate_throughput(self.producer)
@parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL']) @matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol): def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
""" """
Setup: 1 node zk + 3 node kafka cluster Setup: 1 node zk + 3 node kafka cluster
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
@ -100,9 +100,11 @@ class Benchmark(Test):
(This runs ProducerPerformance.java under the hood) (This runs ProducerPerformance.java under the hood)
""" """
self.start_kafka(security_protocol, security_protocol) if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
self.start_kafka(security_protocol, interbroker_security_protocol)
self.producer = ProducerPerformanceService( self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka, security_protocol=security_protocol, self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
intermediate_stats=True intermediate_stats=True
@ -132,10 +134,10 @@ class Benchmark(Test):
self.logger.info("\n".join(summary)) self.logger.info("\n".join(summary))
return data return data
@parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL']) @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol): def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
""" """
Setup: 1 node zk + 3 node kafka cluster Setup: 1 node zk + 3 node kafka cluster
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
@ -145,19 +147,21 @@ class Benchmark(Test):
(Under the hood, this simply runs EndToEndLatency.scala) (Under the hood, this simply runs EndToEndLatency.scala)
""" """
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
self.start_kafka(security_protocol, interbroker_security_protocol) self.start_kafka(security_protocol, interbroker_security_protocol)
self.logger.info("BENCHMARK: End to end latency") self.logger.info("BENCHMARK: End to end latency")
self.perf = EndToEndLatencyService( self.perf = EndToEndLatencyService(
self.test_context, 1, self.kafka, self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE, security_protocol=security_protocol, num_records=10000 topic=TOPIC_REP_THREE, num_records=10000
) )
self.perf.run() self.perf.run()
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
@parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL') @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) @matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_producer_and_consumer(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT'): def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
""" """
Setup: 1 node zk + 3 node kafka cluster Setup: 1 node zk + 3 node kafka cluster
Concurrently produce and consume 10e6 messages with a single producer and a single consumer, Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
@ -167,17 +171,19 @@ class Benchmark(Test):
(Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
""" """
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
self.start_kafka(security_protocol, interbroker_security_protocol) self.start_kafka(security_protocol, interbroker_security_protocol)
num_records = 10 * 1000 * 1000 # 10e6 num_records = 10 * 1000 * 1000 # 10e6
self.producer = ProducerPerformanceService( self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka, self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE, security_protocol=security_protocol, topic=TOPIC_REP_THREE,
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
) )
self.consumer = ConsumerPerformanceService( self.consumer = ConsumerPerformanceService(
self.test_context, 1, self.kafka, security_protocol, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
Service.run_parallel(self.producer, self.consumer) Service.run_parallel(self.producer, self.consumer)
data = { data = {
@ -190,21 +196,23 @@ class Benchmark(Test):
self.logger.info("\n".join(summary)) self.logger.info("\n".join(summary))
return data return data
@parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL') @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) @matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_consumer_throughput(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT', num_consumers=1): def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
""" """
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
(using new consumer iff new_consumer == True), and report throughput. (using new consumer iff new_consumer == True), and report throughput.
""" """
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
self.start_kafka(security_protocol, interbroker_security_protocol) self.start_kafka(security_protocol, interbroker_security_protocol)
num_records = 10 * 1000 * 1000 # 10e6 num_records = 10 * 1000 * 1000 # 10e6
# seed kafka w/messages # seed kafka w/messages
self.producer = ProducerPerformanceService( self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka, self.test_context, 1, self.kafka,
topic=TOPIC_REP_THREE, security_protocol=security_protocol, topic=TOPIC_REP_THREE,
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
) )
@ -213,7 +221,7 @@ class Benchmark(Test):
# consume # consume
self.consumer = ConsumerPerformanceService( self.consumer = ConsumerPerformanceService(
self.test_context, num_consumers, self.kafka, self.test_context, num_consumers, self.kafka,
topic=TOPIC_REP_THREE, security_protocol=security_protocol, new_consumer=new_consumer, messages=num_records) topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
self.consumer.group = "test-consumer-group" self.consumer.group = "test-consumer-group"
self.consumer.run() self.consumer.run()
return compute_aggregate_throughput(self.consumer) return compute_aggregate_throughput(self.consumer)

View File

@ -22,7 +22,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
from kafkatest.utils.security_config import SecurityConfig from kafkatest.services.security.security_config import SecurityConfig
TOPIC = "topic-log4j-appender" TOPIC = "topic-log4j-appender"
MAX_MESSAGES = 100 MAX_MESSAGES = 100
@ -59,7 +59,7 @@ class Log4jAppenderTest(Test):
def start_consumer(self, security_protocol): def start_consumer(self, security_protocol):
enable_new_consumer = security_protocol == SecurityConfig.SSL enable_new_consumer = security_protocol == SecurityConfig.SSL
self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
consumer_timeout_ms=1000, new_consumer=enable_new_consumer, security_protocol=security_protocol) consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
self.consumer.start() self.consumer.start()
@matrix(security_protocol=['PLAINTEXT', 'SSL']) @matrix(security_protocol=['PLAINTEXT', 'SSL'])
@ -83,4 +83,4 @@ class Log4jAppenderTest(Test):
wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count, timeout_sec=10, wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count, timeout_sec=10,
err_msg="Timed out waiting to consume expected number of messages.") err_msg="Timed out waiting to consume expected number of messages.")
self.consumer.stop() self.consumer.stop()

View File

@ -45,13 +45,11 @@ class QuotaTest(Test):
self.maximum_broker_deviation_percentage = 5.0 self.maximum_broker_deviation_percentage = 5.0
self.num_records = 100000 self.num_records = 100000
self.record_size = 3000 self.record_size = 3000
self.security_protocol = 'PLAINTEXT'
self.interbroker_security_protocol = 'PLAINTEXT'
self.zk = ZookeeperService(test_context, num_nodes=1) self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
security_protocol=self.security_protocol, security_protocol='PLAINTEXT',
interbroker_security_protocol=self.interbroker_security_protocol, interbroker_security_protocol='PLAINTEXT',
topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}}, topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}},
quota_config=self.quota_config, quota_config=self.quota_config,
jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec', jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
@ -74,7 +72,7 @@ class QuotaTest(Test):
def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1): def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
# Produce all messages # Produce all messages
producer = ProducerPerformanceService( producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol, self.test_context, producer_num, self.kafka,
topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id, topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id,
jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate']) jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate'])
@ -82,7 +80,7 @@ class QuotaTest(Test):
# Consume all messages # Consume all messages
consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic, consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
security_protocol=self.security_protocol, new_consumer=False, new_consumer=False,
consumer_timeout_ms=60000, client_id=consumer_id, consumer_timeout_ms=60000, client_id=consumer_id,
jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id], jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id],
jmx_attributes=['OneMinuteRate']) jmx_attributes=['OneMinuteRate'])

View File

@ -108,8 +108,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
interbroker_security_protocol=["PLAINTEXT", "SSL"]) security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
def test_replication_with_broker_failure(self, failure_mode, interbroker_security_protocol="PLAINTEXT"): def test_replication_with_broker_failure(self, failure_mode, security_protocol):
"""Replication tests. """Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios. brokers is still available for consumption in the face of various failure scenarios.
@ -122,11 +122,11 @@ class ReplicationTest(ProduceConsumeValidateTest):
- When done driving failures, stop producing, and finish consuming - When done driving failures, stop producing, and finish consuming
- Validate that every acked message was consumed - Validate that every acked message was consumed
""" """
client_security_protocol = 'PLAINTEXT'
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=client_security_protocol, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=client_security_protocol, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.interbroker_security_protocol = interbroker_security_protocol self.kafka.security_protocol = 'PLAINTEXT'
self.kafka.interbroker_security_protocol = security_protocol
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start() self.kafka.start()
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self)) self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))