KAFKA-2527; System Test for Quotas in Ducktape

granders Can you take a look at this quota system test?

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Geoff Anderson, Ewen Cheslack-Postava

Closes #275 from lindong28/KAFKA-2527
This commit is contained in:
Dong Lin 2015-10-13 13:54:40 -07:00 committed by Gwen Shapira
parent 36d4469326
commit 123d27e4d0
7 changed files with 340 additions and 26 deletions

View File

@ -15,11 +15,13 @@
from ducktape.services.background_thread import BackgroundThreadService from ducktape.services.background_thread import BackgroundThreadService
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.services.performance.jmx_mixin import JmxMixin
from kafkatest.services.performance import PerformanceService
from kafkatest.utils.security_config import SecurityConfig from kafkatest.utils.security_config import SecurityConfig
import os import os
import subprocess import subprocess
import itertools
def is_int(msg): def is_int(msg):
"""Default method used to check whether text pulled from console consumer is a message. """Default method used to check whether text pulled from console consumer is a message.
@ -72,7 +74,7 @@ Option Description
""" """
class ConsoleConsumer(BackgroundThreadService): class ConsoleConsumer(JmxMixin, PerformanceService):
# Root directory for persistent output # Root directory for persistent output
PERSISTENT_ROOT = "/mnt/console_consumer" PERSISTENT_ROOT = "/mnt/console_consumer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout") STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
@ -94,7 +96,8 @@ class ConsoleConsumer(BackgroundThreadService):
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None, from_beginning=True, consumer_timeout_ms=None): def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None,
from_beginning=True, consumer_timeout_ms=None, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
""" """
Args: Args:
context: standard context context: standard context
@ -110,7 +113,8 @@ class ConsoleConsumer(BackgroundThreadService):
waiting for the consumer to stop is a pretty good way to consume all messages waiting for the consumer to stop is a pretty good way to consume all messages
in a topic. in a topic.
""" """
super(ConsoleConsumer, self).__init__(context, num_nodes) JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
PerformanceService.__init__(self, context, num_nodes)
self.kafka = kafka self.kafka = kafka
self.new_consumer = new_consumer self.new_consumer = new_consumer
self.args = { self.args = {
@ -122,9 +126,10 @@ class ConsoleConsumer(BackgroundThreadService):
self.from_beginning = from_beginning self.from_beginning = from_beginning
self.message_validator = message_validator self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.client_id = client_id
# Process client configuration # Process client configuration
self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms, client_id=self.client_id)
# Add security properties to the config. If security protocol is not specified, # Add security properties to the config. If security protocol is not specified,
# use the default in the template properties. # use the default in the template properties.
@ -143,10 +148,11 @@ class ConsoleConsumer(BackgroundThreadService):
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['stderr'] = ConsoleConsumer.STDERR_CAPTURE args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
args['config_file'] = ConsoleConsumer.CONFIG_FILE args['config_file'] = ConsoleConsumer.CONFIG_FILE
args['jmx_port'] = self.jmx_port
cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \ cmd += " JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
" --consumer.config %(config_file)s" % args " --consumer.config %(config_file)s" % args
if self.new_consumer: if self.new_consumer:
@ -173,6 +179,7 @@ class ConsoleConsumer(BackgroundThreadService):
def _worker(self, idx, node): def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
# Create and upload config file
self.logger.info("console_consumer.properties:") self.logger.info("console_consumer.properties:")
self.logger.info(self.prop_file) self.logger.info(self.prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
@ -185,18 +192,24 @@ class ConsoleConsumer(BackgroundThreadService):
# Run and capture output # Run and capture output
cmd = self.start_cmd cmd = self.start_cmd
self.logger.debug("Console consumer %d command: %s", idx, cmd) self.logger.debug("Console consumer %d command: %s", idx, cmd)
for line in node.account.ssh_capture(cmd, allow_fail=False):
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
first_line = consumer_output.next()
self.start_jmx_tool(idx, node)
for line in itertools.chain([first_line], consumer_output):
msg = line.strip() msg = line.strip()
if self.message_validator is not None: if self.message_validator is not None:
msg = self.message_validator(msg) msg = self.message_validator(msg)
if msg is not None: if msg is not None:
self.messages_consumed[idx].append(msg) self.messages_consumed[idx].append(msg)
self.read_jmx_output(idx, node)
def start_node(self, node): def start_node(self, node):
super(ConsoleConsumer, self).start_node(node) PerformanceService.start_node(self, node)
def stop_node(self, node): def stop_node(self, node):
node.account.kill_process("java", allow_fail=True) node.account.kill_process("console_consumer", allow_fail=True)
wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2, wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
err_msg="Timed out waiting for consumer to stop.") err_msg="Timed out waiting for consumer to stop.")
@ -204,7 +217,7 @@ class ConsoleConsumer(BackgroundThreadService):
if self.alive(node): if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account)) (self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True) JmxMixin.clean_node(self, node)
PerformanceService.clean_node(self, node)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node) self.security_config.clean_node(node)

View File

@ -15,15 +15,15 @@
from ducktape.services.service import Service from ducktape.services.service import Service
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.services.performance.jmx_mixin import JmxMixin
from kafkatest.utils.security_config import SecurityConfig from kafkatest.utils.security_config import SecurityConfig
import json import json
import re import re
import signal import signal
import time import time
class KafkaService(Service): class KafkaService(JmxMixin, Service):
logs = { logs = {
"kafka_log": { "kafka_log": {
@ -34,13 +34,15 @@ class KafkaService(Service):
"collect_default": False} "collect_default": False}
} }
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None): def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
topics=None, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
""" """
:type context :type context
:type zk: ZookeeperService :type zk: ZookeeperService
:type topics: dict :type topics: dict
""" """
super(KafkaService, self).__init__(context, num_nodes) Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
self.zk = zk self.zk = zk
if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL: if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
self.security_config = SecurityConfig(SecurityConfig.SSL) self.security_config = SecurityConfig(SecurityConfig.SSL)
@ -50,9 +52,10 @@ class KafkaService(Service):
self.interbroker_security_protocol = interbroker_security_protocol self.interbroker_security_protocol = interbroker_security_protocol
self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093 self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
self.topics = topics self.topics = topics
self.quota_config = quota_config
def start(self): def start(self):
super(KafkaService, self).start() Service.start(self)
# Create topics if necessary # Create topics if necessary
if self.topics is not None: if self.topics is not None:
@ -65,18 +68,19 @@ class KafkaService(Service):
def start_node(self, node): def start_node(self, node):
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node), props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
port = self.port, security_protocol = self.security_protocol, port = self.port, security_protocol = self.security_protocol, quota_config=self.quota_config,
interbroker_security_protocol=self.interbroker_security_protocol) interbroker_security_protocol=self.interbroker_security_protocol)
self.logger.info("kafka.properties:") self.logger.info("kafka.properties:")
self.logger.info(props_file) self.logger.info(props_file)
node.account.create_file("/mnt/kafka.properties", props_file) node.account.create_file("/mnt/kafka.properties", props_file)
self.security_config.setup_node(node) self.security_config.setup_node(node)
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" cmd = "JMX_PORT=%d /opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" % self.jmx_port
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log("/mnt/kafka.log") as monitor: with node.account.monitor_log("/mnt/kafka.log") as monitor:
node.account.ssh(cmd) node.account.ssh(cmd)
monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup") monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0: if len(self.pids(node)) == 0:
raise Exception("No process ids recorded on node %s" % str(node)) raise Exception("No process ids recorded on node %s" % str(node))
@ -106,6 +110,7 @@ class KafkaService(Service):
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False) node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
def clean_node(self, node): def clean_node(self, node):
JmxMixin.clean_node(self, node)
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
self.security_config.clean_node(node) self.security_config.clean_node(node)
@ -242,3 +247,7 @@ class KafkaService(Service):
"""Get the broker list to connect to Kafka using the specified security protocol """Get the broker list to connect to Kafka using the specified security protocol
""" """
return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes]) return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])
def read_jmx_output_all_nodes(self):
for node in self.nodes:
self.read_jmx_output(self.idx(node), node)

View File

@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class JmxMixin(object):
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
self.jmx_object_names = jmx_object_names
self.jmx_attributes = jmx_attributes
self.jmx_port = 9192
self.started = [False] * num_nodes
self.jmx_stats = [{} for x in range(num_nodes)]
self.maximum_jmx_value = {} # map from object_attribute_name to maximum value observed over time
self.average_jmx_value = {} # map from object_attribute_name to average value observed over time
def clean_node(self, node):
node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
def start_jmx_tool(self, idx, node):
if self.started[idx-1] == True or self.jmx_object_names == None:
return
self.started[idx-1] = True
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \
"--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
for jmx_object_name in self.jmx_object_names:
cmd += " --object-name %s" % jmx_object_name
for jmx_attribute in self.jmx_attributes:
cmd += " --attributes %s" % jmx_attribute
cmd += " | tee -a /mnt/jmx_tool.log"
self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
jmx_output.next()
def read_jmx_output(self, idx, node):
if self.started[idx-1] == False:
return
self.maximum_jmx_value = {}
self.average_jmx_value = {}
object_attribute_names = []
cmd = "cat /mnt/jmx_tool.log"
self.logger.debug("Read jmx output %d command: %s", idx, cmd)
for line in node.account.ssh_capture(cmd, allow_fail=False):
if "time" in line:
object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
continue
stats = [float(field) for field in line.split(',')]
time_sec = int(stats[0]/1000)
self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
# do not calculate average and maximum of jmx stats until we have read output from all nodes
if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats):
return
start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
for name in object_attribute_names:
aggregates_per_time = []
for time_sec in xrange(start_time_sec, end_time_sec+1):
# assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
# assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
aggregates_per_time.append(sum(values_per_node))
self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time)
self.maximum_jmx_value[name] = max(aggregates_per_time)

View File

@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from kafkatest.services.performance.jmx_mixin import JmxMixin
from kafkatest.services.performance import PerformanceService from kafkatest.services.performance import PerformanceService
import itertools
from kafkatest.utils.security_config import SecurityConfig from kafkatest.utils.security_config import SecurityConfig
class ProducerPerformanceService(JmxMixin, PerformanceService):
class ProducerPerformanceService(PerformanceService):
logs = { logs = {
"producer_performance_log": { "producer_performance_log": {
@ -25,8 +26,10 @@ class ProducerPerformanceService(PerformanceService):
"collect_default": True}, "collect_default": True},
} }
def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={},
super(ProducerPerformanceService, self).__init__(context, num_nodes) intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
PerformanceService.__init__(self, context, num_nodes)
self.kafka = kafka self.kafka = kafka
self.security_config = SecurityConfig(security_protocol) self.security_config = SecurityConfig(security_protocol)
self.security_protocol = security_protocol self.security_protocol = security_protocol
@ -38,12 +41,13 @@ class ProducerPerformanceService(PerformanceService):
} }
self.settings = settings self.settings = settings
self.intermediate_stats = intermediate_stats self.intermediate_stats = intermediate_stats
self.client_id = client_id
def _worker(self, idx, node): def _worker(self, idx, node):
args = self.args.copy() args = self.args.copy()
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id})
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance " \
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
self.security_config.setup_node(node) self.security_config.setup_node(node)
if self.security_protocol == SecurityConfig.SSL: if self.security_protocol == SecurityConfig.SSL:
@ -68,7 +72,10 @@ class ProducerPerformanceService(PerformanceService):
'latency_999th_ms': float(parts[7].split()[0]), 'latency_999th_ms': float(parts[7].split()[0]),
} }
last = None last = None
for line in node.account.ssh_capture(cmd): producer_output = node.account.ssh_capture(cmd)
first_line = producer_output.next()
self.start_jmx_tool(idx, node)
for line in itertools.chain([first_line], producer_output):
if self.intermediate_stats: if self.intermediate_stats:
try: try:
self.stats[idx-1].append(parse_stats(line)) self.stats[idx-1].append(parse_stats(line))
@ -81,3 +88,4 @@ class ProducerPerformanceService(PerformanceService):
self.results[idx-1] = parse_stats(last) self.results[idx-1] = parse_stats(last)
except: except:
raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last)) raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
self.read_jmx_output(idx, node)

View File

@ -17,3 +17,10 @@
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %} {% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }} consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %} {% endif %}
group.id={{ group_id|default('test-consumer-group') }}
{% if client_id is defined and client_id is not none %}
client.id={{ client_id }}
{% endif %}

View File

@ -47,6 +47,22 @@ log.cleaner.enable=false
zookeeper.connect={{ zk.connect_setting() }} zookeeper.connect={{ zk.connect_setting() }}
zookeeper.connection.timeout.ms=2000 zookeeper.connection.timeout.ms=2000
{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %}
quota.producer.default={{ quota_config.quota_producer_default }}
{% endif %}
{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default is not none %}
quota.consumer.default={{ quota_config.quota_consumer_default }}
{% endif %}
{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides is not none %}
quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides }}
{% endif %}
{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides is not none %}
quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
{% endif %}
security.inter.broker.protocol={{ interbroker_security_protocol }} security.inter.broker.protocol={{ interbroker_security_protocol }}
ssl.keystore.location=/mnt/ssl/test.keystore.jks ssl.keystore.location=/mnt/ssl/test.keystore.jks
ssl.keystore.password=test-ks-passwd ssl.keystore.password=test-ks-passwd

View File

@ -0,0 +1,180 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer, is_int
import random
import signal
import time
class QuotaTest(Test):
"""
These tests verify that quota provides expected functionality -- they run
producer, broker, and consumer with different clientId and quota configuration and
check that the observed throughput is close to the value we expect.
"""
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(QuotaTest, self).__init__(test_context=test_context)
self.topic = 'test_topic'
self.logger.info('use topic ' + self.topic)
# quota related parameters
self.quota_config = {'quota_producer_default': 2500000,
'quota_consumer_default': 2000000,
'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
self.maximum_client_deviation_percentage = 100.0
self.maximum_broker_deviation_percentage = 5.0
self.num_records = 100000
self.record_size = 3000
self.security_protocol = 'PLAINTEXT'
self.interbroker_security_protocol = 'PLAINTEXT'
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
security_protocol=self.security_protocol,
interbroker_security_protocol=self.interbroker_security_protocol,
topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}},
quota_config=self.quota_config,
jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
jmx_attributes=['OneMinuteRate'])
self.num_producers = 1
self.num_consumers = 2
def setUp(self):
self.zk.start()
self.kafka.start()
def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
def run_clients(self, producer_id, producer_num, consumer_id, consumer_num):
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol,
topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id,
jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate'])
producer.run()
# Consume all messages
consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
security_protocol=self.security_protocol, new_consumer=False,
consumer_timeout_ms=60000, client_id=consumer_id,
jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id],
jmx_attributes=['OneMinuteRate'])
consumer.run()
for idx, messages in consumer.messages_consumed.iteritems():
assert len(messages)>0, "consumer %d didn't consume any message before timeout" % idx
success, msg = self.validate(self.kafka, producer, consumer)
assert success, msg
def validate(self, broker, producer, consumer):
"""
For each client_id we validate that:
1) number of consumed messages equals number of produced messages
2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
"""
success = True
msg = ''
self.kafka.read_jmx_output_all_nodes()
# validate that number of consumed messages equals number of produced messages
produced_num = sum([value['records'] for value in producer.results])
consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
self.logger.info('producer produced %d messages' % produced_num)
self.logger.info('consumer consumed %d messages' % consumed_num)
if produced_num != consumed_num:
success = False
msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
# validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
producer_quota_bps = self.get_producer_quota(producer.client_id)
self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
success = False
msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
(producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
# validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
(broker_maximum_byte_in_bps, producer_quota_bps))
if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
success = False
msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
(broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
# validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate' % consumer.client_id
consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
success = False
msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
(consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
# validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
(broker_maximum_byte_out_bps, consumer_quota_bps))
if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
success = False
msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
(broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
return success, msg
def get_producer_quota(self, client_id):
overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
if client_id in overridden_quotas:
return float(overridden_quotas[client_id])
return self.quota_config['quota_producer_default']
def get_consumer_quota(self, client_id):
overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
if client_id in overridden_quotas:
return float(overridden_quotas[client_id])
return self.quota_config['quota_consumer_default']
@parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
@parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
@parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
self.run_clients(producer_id, producer_num, consumer_id, consumer_num)