From a5f1158c317e22a79c4186d1acb04fb25ce6e56a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 18 Apr 2016 14:23:46 -0700 Subject: [PATCH] KAFKA-3558; Add compression_type parameter to benchmarks in benchmark_test.py * Use a fixed `Random` seed in `EndToEndLatency.scala` for determinism * Add `compression_type` to and remove `consumer_fetch_max_wait` from `end_to_end_latency.py`. The latter was never used. * Tweak logging of `end_to_end_latency.py` to be similar to `consumer_performance.py`. * Add `compression_type` to `benchmark_test.py` methods and add `snappy` to `matrix` annotation * Use randomly generated bytes from a restricted range for `ProducerPerformance` payload. This is a simple fix for now. It can be improved in the PR for KAFKA-3554. Author: Ismael Juma Reviewers: Ewen Cheslack-Postava Closes #1225 from ijuma/kafka-3558-add-compression_type-benchmark_test.py --- .../kafka/tools/ConsumerPerformance.scala | 14 ++-- .../scala/kafka/tools/EndToEndLatency.scala | 18 ++-- .../benchmarks/core/benchmark_test.py | 56 +++++++++---- .../performance/end_to_end_latency.py | 84 ++++++++++--------- .../kafka/tools/ProducerPerformance.java | 7 +- 5 files changed, 105 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index a38c04b4ff3..6480ff5ae4d 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -19,8 +19,6 @@ package kafka.tools import java.util -import org.apache.kafka.common.TopicPartition - import scala.collection.JavaConversions._ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException @@ -85,10 +83,9 @@ object ConsumerPerformance { thread.start for (thread <- threadList) thread.join - if(consumerTimeout.get()) - endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs - else - endMs = System.currentTimeMillis + endMs = + if (consumerTimeout.get()) System.currentTimeMillis - consumerConfig.consumerTimeoutMs + else System.currentTimeMillis consumerConnector.shutdown() } val elapsedSecs = (endMs - startMs) / 1000.0 @@ -279,9 +276,8 @@ object ConsumerPerformance { } catch { case _: InterruptedException => case _: ClosedByInterruptException => - case _: ConsumerTimeoutException => { - consumerTimeout.set(true); - } + case _: ConsumerTimeoutException => + consumerTimeout.set(true) case e: Throwable => e.printStackTrace() } totalMessagesRead.addAndGet(messagesRead) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 584d4fb7ee7..1c920888c29 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.TopicPartition import scala.collection.JavaConversions._ +import scala.util.Random /** @@ -43,7 +44,7 @@ object EndToEndLatency { def main(args: Array[String]) { if (args.length != 5 && args.length != 6) { - System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] ssl_properties_file") + System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file") System.exit(1) } @@ -52,12 +53,14 @@ object EndToEndLatency { val numMessages = args(2).toInt val producerAcks = args(3) val messageLen = args(4).toInt - val sslPropsFile = if (args.length == 6) args(5) else "" + val propsFile = if (args.length > 5) Some(args(5)).filter(_.nonEmpty) else None if (!List("1", "all").contains(producerAcks)) throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all") - val consumerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile) + def loadProps: Properties = propsFile.map(Utils.loadProps).getOrElse(new Properties()) + + val consumerProps = loadProps consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis()) consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -69,7 +72,7 @@ object EndToEndLatency { val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) consumer.subscribe(List(topic)) - val producerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile) + val producerProps = loadProps producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString) @@ -91,9 +94,10 @@ object EndToEndLatency { var totalTime = 0.0 val latencies = new Array[Long](numMessages) + val random = new Random(0) for (i <- 0 until numMessages) { - val message = randomBytesOfLen(messageLen) + val message = randomBytesOfLen(random, messageLen) val begin = System.nanoTime //Send message (of random bytes) synchronously then immediately poll for it @@ -141,7 +145,7 @@ object EndToEndLatency { finalise() } - def randomBytesOfLen(len: Int): Array[Byte] = { - Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte) + def randomBytesOfLen(random: Random, len: Int): Array[Byte] = { + Array.fill(len)((random.nextInt(26) + 65).toByte) } } diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index d252e5dead2..83f4b2a23ad 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -68,9 +68,10 @@ class Benchmark(Test): @parametrize(acks=1, topic=TOPIC_REP_THREE) @parametrize(acks=-1, topic=TOPIC_REP_THREE) @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) - @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL']) - def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT', - client_version=str(TRUNK), broker_version=str(TRUNK)): + @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL']) + def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, + compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK), + broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, @@ -91,15 +92,17 @@ class Benchmark(Test): num_records=nrecords, record_size=message_size, throughput=-1, version=client_version, settings={ 'acks': acks, + 'compression.type': compression_type, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}) self.producer.run() return compute_aggregate_throughput(self.producer) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None, - client_version=str(TRUNK), broker_version=str(TRUNK)): + @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) + def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT', + interbroker_security_protocol=None, client_version=str(TRUNK), + broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. @@ -117,7 +120,12 @@ class Benchmark(Test): self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, - throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, + throughput=-1, version=client_version, settings={ + 'acks': 1, + 'compression.type': compression_type, + 'batch.size': self.batch_size, + 'buffer.memory': self.buffer_memory + }, intermediate_stats=True ) self.producer.run() @@ -146,9 +154,10 @@ class Benchmark(Test): return data @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) - def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None, - client_version=str(TRUNK), broker_version=str(TRUNK)): + @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"]) + def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", + interbroker_security_protocol=None, client_version=str(TRUNK), + broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, @@ -167,15 +176,17 @@ class Benchmark(Test): self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=10000, version=client_version + topic=TOPIC_REP_THREE, num_records=10000, + compression_type=compression_type, version=client_version ) self.perf.run() return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, + @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) + def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT", + interbroker_security_protocol=None, new_consumer=True, client_version=str(TRUNK), broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster @@ -198,7 +209,12 @@ class Benchmark(Test): self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, - settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} + settings={ + 'acks': 1, + 'compression.type': compression_type, + 'batch.size': self.batch_size, + 'buffer.memory': self.buffer_memory + } ) self.consumer = ConsumerPerformanceService( self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) @@ -216,8 +232,9 @@ class Benchmark(Test): @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1, + @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) + def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", + interbroker_security_protocol=None, new_consumer=True, num_consumers=1, client_version=str(TRUNK), broker_version=str(TRUNK)): """ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions @@ -236,7 +253,12 @@ class Benchmark(Test): self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, - settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} + settings={ + 'acks': 1, + 'compression.type': compression_type, + 'batch.size': self.batch_size, + 'buffer.memory': self.buffer_memory + } ) self.producer.run() diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 08eff70cf1e..6d21151d530 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -17,32 +17,53 @@ from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.kafka.directory import kafka_dir -from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0 +from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0, V_0_10_0_0 +import os class EndToEndLatencyService(PerformanceService): MESSAGE_BYTES = 21 # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/end_to_end_latency" + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "end_to_end_latency.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "end_to_end_latency.stderr") + LOG_FILE = os.path.join(LOG_DIR, "end_to_end_latency.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "client.properties") + logs = { - "end_to_end_latency_log": { - "path": "/mnt/end-to-end-latency.log", + "end_to_end_latency_output": { + "path": STDOUT_CAPTURE, "collect_default": True}, + "end_to_end_latency_stderr": { + "path": STDERR_CAPTURE, + "collect_default": True}, + "end_to_end_latency_log": { + "path": LOG_FILE, + "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, num_records, version=TRUNK, consumer_fetch_max_wait=100, acks=1): + + def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=TRUNK, acks=1): super(EndToEndLatencyService, self).__init__(context, num_nodes) self.kafka = kafka self.security_config = kafka.security_config.client_config() security_protocol = self.security_config.security_protocol - assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \ - "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) + + if version < V_0_9_0_0: + assert security_protocol == SecurityConfig.PLAINTEXT, \ + "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) + assert compression_type == "none", \ + "Compression type %s is only supported if version >= 0.9.0.0, version %s" % (compression_type, str(version)) self.args = { 'topic': topic, 'num_records': num_records, - 'consumer_fetch_max_wait': consumer_fetch_max_wait, 'acks': acks, + 'compression_type': compression_type, 'kafka_opts': self.security_config.kafka_opts, 'message_bytes': EndToEndLatencyService.MESSAGE_BYTES } @@ -50,56 +71,41 @@ class EndToEndLatencyService(PerformanceService): for node in self.nodes: node.version = version - @property - def security_config_file(self): - if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: - security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties" - else: - security_config_file = "" - return security_config_file - def start_cmd(self, node): args = self.args.copy() args.update({ 'zk_connect': self.kafka.zk.connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), - 'security_config_file': self.security_config_file, + 'config_file': EndToEndLatencyService.CONFIG_FILE, 'kafka_dir': kafka_dir(node) }) + cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG if node.version >= V_0_9_0_0: - """ - val brokerList = args(0) - val topic = args(1) - val numMessages = args(2).toInt - val producerAcks = args(3) - val messageLen = args(4).toInt - """ - - cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args - cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(security_config_file)s" % args + cmd += "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args + cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args else: - """ - val brokerList = args(0) - val zkConnect = args(1) - val topic = args(2) - val numMessages = args(3).toInt - val consumerFetchMaxWait = args(4).toInt - val producerAcks = args(5).toInt - """ - # Set fetch max wait to 0 to match behavior in later versions - cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args + cmd += "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args - cmd += " | tee /mnt/end-to-end-latency.log" + cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': EndToEndLatencyService.STDOUT_CAPTURE, + 'stderr': EndToEndLatencyService.STDERR_CAPTURE} return cmd def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % EndToEndLatencyService.PERSISTENT_ROOT, allow_fail=False) + + log_config = self.render('tools_log4j.properties', log_file=EndToEndLatencyService.LOG_FILE) + + node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config) + client_config = str(self.security_config) + if node.version >= V_0_9_0_0: + client_config += "compression_type=%(compression_type)s" % self.args + node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config) + self.security_config.setup_node(node) - if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: - node.account.create_file(self.security_config_file, str(self.security_config)) cmd = self.start_cmd(node) self.logger.debug("End-to-end latency %d command: %s", idx, cmd) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 18daf09b62d..b83227f0173 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -17,6 +17,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.Random; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; @@ -59,8 +60,10 @@ public class ProducerPerformance { /* setup perf test */ byte[] payload = new byte[recordSize]; - Arrays.fill(payload, (byte) 1); - ProducerRecord record = new ProducerRecord(topicName, payload); + Random random = new Random(0); + for (int i = 0; i < payload.length; ++i) + payload[i] = (byte) (random.nextInt(26) + 65); + ProducerRecord record = new ProducerRecord<>(topicName, payload); Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis();