diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 277f8ed264c..139078931d1 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.utils.util import wait_until from ducktape.mark import matrix from ducktape.mark import parametrize from ducktape.mark.resource import cluster @@ -20,9 +21,12 @@ from ducktape.services.service import Service from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput +from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, ShareConsumerPerformanceService, throughput, latency, compute_aggregate_throughput +from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, KafkaVersion +import os + TOPIC_REP_ONE = "topic-replication-factor-one" TOPIC_REP_THREE = "topic-replication-factor-three" DEFAULT_RECORD_SIZE = 100 # bytes @@ -232,6 +236,71 @@ class Benchmark(Test): str(data)] self.logger.info("\n".join(summary)) return data + + @cluster(num_nodes=8) + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], + compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], + use_share_groups=[True]) + def test_producer_and_share_consumer(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, + interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH), + metadata_quorum=quorum.isolated_kraft, use_share_groups=True): + """ + Setup: 3 node kafka cluster + Concurrently produce and consume 1e6 messages with a single producer and a single share consumer, + + Return aggregate throughput statistics for both producer and share consumer. + + (Under the hood, this runs ProducerPerformance.java, and ShareConsumerPerformance.java) + """ + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) + if interbroker_security_protocol is None: + interbroker_security_protocol = security_protocol + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version) + num_records = 1000 * 1000 # 1e6 + + self.producer = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic=TOPIC_REP_THREE, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, + settings={ + 'acks': 1, + 'compression.type': compression_type, + 'batch.size': self.batch_size, + 'buffer.memory': self.buffer_memory + } + ) + + share_group = "perf-share-consumer" + + kafka_node = self.kafka.nodes[0] + PERSISTENT_ROOT = "/mnt/share_consumer_performance" + COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties") + + if security_protocol is not SecurityConfig.PLAINTEXT: + prop_file = str(self.kafka.security_config.client_config()) + self.logger.debug(prop_file) + kafka_node.account.ssh("mkdir -p %s" % PERSISTENT_ROOT, allow_fail=False) + kafka_node.account.create_file(COMMAND_CONFIG_FILE, prop_file) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=share_group, strategy="earliest", command_config=COMMAND_CONFIG_FILE), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") + + self.share_consumer = ShareConsumerPerformanceService( + self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, messages=num_records, group=share_group, timeout=20000) + Service.run_parallel(self.producer, self.share_consumer) + + data = { + "producer": compute_aggregate_throughput(self.producer), + "share_consumer": compute_aggregate_throughput(self.share_consumer) + } + summary = [ + "Producer + share_consumer:", + str(data)] + self.logger.info("\n".join(summary)) + return data @cluster(num_nodes=8) @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], @@ -273,6 +342,62 @@ class Benchmark(Test): self.consumer.group = "test-consumer-group" self.consumer.run() return compute_aggregate_throughput(self.consumer) + + @cluster(num_nodes=8) + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], + compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], + use_share_groups=[True]) + def test_share_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, + interbroker_security_protocol=None, num_consumers=1, client_version=str(DEV_BRANCH), + broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft, use_share_groups=True): + """ + Consume 1e6 100-byte messages with 1 or more consumers from a topic with 6 partitions + and report throughput. + """ + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) + if interbroker_security_protocol is None: + interbroker_security_protocol = security_protocol + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version) + num_records = 1000 * 1000 # 1e6 + + # seed kafka w/messages + self.producer = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic=TOPIC_REP_THREE, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, + settings={ + 'acks': 1, + 'compression.type': compression_type, + 'batch.size': self.batch_size, + 'buffer.memory': self.buffer_memory + } + ) + self.producer.run() + + share_group = "test-share-consumer-group" + + kafka_node = self.kafka.nodes[0] + PERSISTENT_ROOT = "/mnt/share_consumer_performance" + COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties") + + if security_protocol is not SecurityConfig.PLAINTEXT: + prop_file = str(self.kafka.security_config.client_config()) + self.logger.debug(prop_file) + kafka_node.account.ssh("mkdir -p %s" % PERSISTENT_ROOT, allow_fail=False) + kafka_node.account.create_file(COMMAND_CONFIG_FILE, prop_file) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=share_group, strategy="earliest", command_config=COMMAND_CONFIG_FILE), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") + + # consume + self.share_consumer = ShareConsumerPerformanceService( + self.test_context, num_consumers, self.kafka, + topic=TOPIC_REP_THREE, messages=num_records, group=share_group, timeout=20000) + self.share_consumer.run() + return compute_aggregate_throughput(self.share_consumer) def validate_versions(self, client_version, broker_version): assert client_version <= broker_version, "Client version %s should be <= than broker version %s" (client_version, broker_version) diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py index 239c173a509..79ab6a6c5a8 100644 --- a/tests/kafkatest/sanity_checks/test_performance_services.py +++ b/tests/kafkatest/sanity_checks/test_performance_services.py @@ -13,14 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.utils.util import wait_until from ducktape.mark import matrix, parametrize from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService +from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService, ShareConsumerPerformanceService from kafkatest.services.performance import latency, compute_aggregate_throughput -from kafkatest.version import DEV_BRANCH, LATEST_2_1, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_2_1, V_4_1_0, KafkaVersion class PerformanceServiceTest(Test): @@ -30,9 +31,10 @@ class PerformanceServiceTest(Test): self.num_records = 10000 self.topic = "topic" - @cluster(num_nodes=5) - @matrix(version=[str(LATEST_2_1), str(DEV_BRANCH)], metadata_quorum=quorum.all_kraft) - def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk): + @cluster(num_nodes=6) + @matrix(version=[str(LATEST_2_1)], metadata_quorum=quorum.all_kraft) + @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_kraft, use_share_groups=[True]) + def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk, use_share_groups=False): """ Sanity check out producer performance service - verify that we can run the service with a small number of messages. The actual stats here are pretty meaningless since the number of messages is quite small. @@ -73,8 +75,24 @@ class PerformanceServiceTest(Test): consumer_perf_data = compute_aggregate_throughput(self.consumer_perf) assert consumer_perf_data['records_per_sec'] > 0 - return { + results = { "producer_performance": producer_perf_data, "end_to_end_latency": end_to_end_data, - "consumer_performance": consumer_perf_data + "consumer_performance": consumer_perf_data, } + + if version >= V_4_1_0: + # check basic run of share consumer performance service + self.share_consumer_perf = ShareConsumerPerformanceService( + self.test_context, 1, self.kafka, + topic=self.topic, version=version, messages=self.num_records) + share_group = "test-share-consumer-group" + self.share_consumer_perf.group = share_group + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=share_group, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") + self.share_consumer_perf.run() + share_consumer_perf_data = compute_aggregate_throughput(self.share_consumer_perf) + assert share_consumer_perf_data['records_per_sec'] > 0 + results["share_consumer_performance"] = share_consumer_perf_data + + return results diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 79c2fb81b26..88f7fc24c66 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1728,6 +1728,29 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): return False return True + def set_share_group_offset_reset_strategy(self, group, strategy=None, node=None, command_config=None): + """ Set the offset reset strategy config for the given group. + """ + if strategy is None: + return + if node is None: + node = self.nodes[0] + config_script = self.path.script("kafka-configs.sh", node) + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + cmd = fix_opts_for_new_jvm(node) + cmd += "%s --bootstrap-server %s --group %s --alter --add-config \"share.auto.offset.reset=%s\" %s" % \ + (config_script, + self.bootstrap_servers(self.security_protocol), + group, + strategy, + command_config) + return "Completed" in self.run_cli_tool(node, cmd) + def list_consumer_groups(self, node=None, command_config=None, state=None, type=None): """ Get list of consumer groups. """ @@ -1750,7 +1773,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): if type is not None: cmd += " --type %s" % type return self.run_cli_tool(node, cmd) - + def list_share_groups(self, node=None, command_config=None, state=None): """ Get list of share groups. """ diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py index 69686f75051..fdfafbbabff 100644 --- a/tests/kafkatest/services/performance/__init__.py +++ b/tests/kafkatest/services/performance/__init__.py @@ -17,3 +17,4 @@ from .performance import PerformanceService, throughput, latency, compute_aggreg from .end_to_end_latency import EndToEndLatencyService from .producer_performance import ProducerPerformanceService from .consumer_performance import ConsumerPerformanceService +from .share_consumer_performance import ShareConsumerPerformanceService diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py b/tests/kafkatest/services/performance/share_consumer_performance.py new file mode 100644 index 00000000000..ccb09524580 --- /dev/null +++ b/tests/kafkatest/services/performance/share_consumer_performance.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config_for_tools +from kafkatest.services.performance import PerformanceService +from kafkatest.version import DEV_BRANCH + + +class ShareConsumerPerformanceService(PerformanceService): + """ + See ShareConsumerPerformance tool as the source of truth on these settings, but for reference: + + "topic", "REQUIRED: The topic to consume from." + + "group", "The group id to consume on." + + "fetch-size", "The amount of data to fetch in a single request." + + "socket-buffer-size", "The size of the tcp RECV size." + + "consumer.config", "Consumer config properties file." + """ + + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/share_consumer_performance" + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "share_consumer_performance.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "share_consumer_performance.stderr") + LOG_FILE = os.path.join(LOG_DIR, "share_consumer_performance.log") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "share_consumer.properties") + + logs = { + "share_consumer_performance_output": { + "path": STDOUT_CAPTURE, + "collect_default": True}, + "share_consumer_performance_stderr": { + "path": STDERR_CAPTURE, + "collect_default": True}, + "share_consumer_performance_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, messages, group="perf-share-consumer", version=DEV_BRANCH, timeout=10000, settings={}): + super(ShareConsumerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.security_config = kafka.security_config.client_config() + self.topic = topic + self.messages = messages + self.settings = settings + self.group = group + self.timeout = timeout + + # These less-frequently used settings can be updated manually after instantiation + self.fetch_size = None + self.socket_buffer_size = None + + for node in self.nodes: + node.version = version + + def args(self): + """Dictionary of arguments used to start the Share Consumer Performance script.""" + args = { + 'topic': self.topic, + 'messages': self.messages, + 'bootstrap-server': self.kafka.bootstrap_servers(self.security_config.security_protocol), + 'group': self.group, + 'timeout': self.timeout + } + + if self.fetch_size is not None: + args['fetch-size'] = self.fetch_size + + if self.socket_buffer_size is not None: + args['socket-buffer-size'] = self.socket_buffer_size + + return args + + def start_cmd(self, node): + cmd = fix_opts_for_new_jvm(node) + cmd += "export LOG_DIR=%s;" % ShareConsumerPerformanceService.LOG_DIR + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\";" % (get_log4j_config_param(node), get_log4j_config_for_tools(node)) + cmd += " %s" % self.path.script("kafka-share-consumer-perf-test.sh", node) + for key, value in self.args().items(): + cmd += " --%s %s" % (key, value) + + cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE + + for key, value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + + cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': ShareConsumerPerformanceService.STDOUT_CAPTURE, + 'stderr': ShareConsumerPerformanceService.STDERR_CAPTURE} + return cmd + + def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % ShareConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False) + + log_config = self.render(get_log4j_config_for_tools(node), log_file=ShareConsumerPerformanceService.LOG_FILE) + node.account.create_file(get_log4j_config_for_tools(node), log_config) + node.account.create_file(ShareConsumerPerformanceService.CONFIG_FILE, str(self.security_config)) + self.security_config.setup_node(node) + + cmd = self.start_cmd(node) + self.logger.debug("Share consumer performance %d command: %s", idx, cmd) + last = None + for line in node.account.ssh_capture(cmd): + last = line + + # Parse and save the last line's information + if last is not None: + parts = last.split(',') + self.results[idx-1] = { + 'total_mb': float(parts[2]), + 'mbps': float(parts[3]), + 'records_per_sec': float(parts[5]), + }