mirror of https://github.com/apache/kafka.git
KAFKA-3558; Add compression_type parameter to benchmarks in benchmark_test.py
* Use a fixed `Random` seed in `EndToEndLatency.scala` for determinism * Add `compression_type` to and remove `consumer_fetch_max_wait` from `end_to_end_latency.py`. The latter was never used. * Tweak logging of `end_to_end_latency.py` to be similar to `consumer_performance.py`. * Add `compression_type` to `benchmark_test.py` methods and add `snappy` to `matrix` annotation * Use randomly generated bytes from a restricted range for `ProducerPerformance` payload. This is a simple fix for now. It can be improved in the PR for KAFKA-3554. Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #1225 from ijuma/kafka-3558-add-compression_type-benchmark_test.py
This commit is contained in:
parent
a81ad2582e
commit
a5f1158c31
|
@ -19,8 +19,6 @@ package kafka.tools
|
|||
|
||||
import java.util
|
||||
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.nio.channels.ClosedByInterruptException
|
||||
|
@ -85,10 +83,9 @@ object ConsumerPerformance {
|
|||
thread.start
|
||||
for (thread <- threadList)
|
||||
thread.join
|
||||
if(consumerTimeout.get())
|
||||
endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs
|
||||
else
|
||||
endMs = System.currentTimeMillis
|
||||
endMs =
|
||||
if (consumerTimeout.get()) System.currentTimeMillis - consumerConfig.consumerTimeoutMs
|
||||
else System.currentTimeMillis
|
||||
consumerConnector.shutdown()
|
||||
}
|
||||
val elapsedSecs = (endMs - startMs) / 1000.0
|
||||
|
@ -279,9 +276,8 @@ object ConsumerPerformance {
|
|||
} catch {
|
||||
case _: InterruptedException =>
|
||||
case _: ClosedByInterruptException =>
|
||||
case _: ConsumerTimeoutException => {
|
||||
consumerTimeout.set(true);
|
||||
}
|
||||
case _: ConsumerTimeoutException =>
|
||||
consumerTimeout.set(true)
|
||||
case e: Throwable => e.printStackTrace()
|
||||
}
|
||||
totalMessagesRead.addAndGet(messagesRead)
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils
|
|||
import org.apache.kafka.common.TopicPartition
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.util.Random
|
||||
|
||||
|
||||
/**
|
||||
|
@ -43,7 +44,7 @@ object EndToEndLatency {
|
|||
|
||||
def main(args: Array[String]) {
|
||||
if (args.length != 5 && args.length != 6) {
|
||||
System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] ssl_properties_file")
|
||||
System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
|
@ -52,12 +53,14 @@ object EndToEndLatency {
|
|||
val numMessages = args(2).toInt
|
||||
val producerAcks = args(3)
|
||||
val messageLen = args(4).toInt
|
||||
val sslPropsFile = if (args.length == 6) args(5) else ""
|
||||
val propsFile = if (args.length > 5) Some(args(5)).filter(_.nonEmpty) else None
|
||||
|
||||
if (!List("1", "all").contains(producerAcks))
|
||||
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")
|
||||
|
||||
val consumerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
|
||||
def loadProps: Properties = propsFile.map(Utils.loadProps).getOrElse(new Properties())
|
||||
|
||||
val consumerProps = loadProps
|
||||
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
|
||||
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||
|
@ -69,7 +72,7 @@ object EndToEndLatency {
|
|||
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
|
||||
consumer.subscribe(List(topic))
|
||||
|
||||
val producerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
|
||||
val producerProps = loadProps
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
|
||||
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
|
||||
|
@ -91,9 +94,10 @@ object EndToEndLatency {
|
|||
|
||||
var totalTime = 0.0
|
||||
val latencies = new Array[Long](numMessages)
|
||||
val random = new Random(0)
|
||||
|
||||
for (i <- 0 until numMessages) {
|
||||
val message = randomBytesOfLen(messageLen)
|
||||
val message = randomBytesOfLen(random, messageLen)
|
||||
val begin = System.nanoTime
|
||||
|
||||
//Send message (of random bytes) synchronously then immediately poll for it
|
||||
|
@ -141,7 +145,7 @@ object EndToEndLatency {
|
|||
finalise()
|
||||
}
|
||||
|
||||
def randomBytesOfLen(len: Int): Array[Byte] = {
|
||||
Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte)
|
||||
def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
|
||||
Array.fill(len)((random.nextInt(26) + 65).toByte)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,9 +68,10 @@ class Benchmark(Test):
|
|||
@parametrize(acks=1, topic=TOPIC_REP_THREE)
|
||||
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
|
||||
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
|
||||
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
|
||||
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT',
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL'])
|
||||
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
|
||||
compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK),
|
||||
broker_version=str(TRUNK)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
|
||||
|
@ -91,15 +92,17 @@ class Benchmark(Test):
|
|||
num_records=nrecords, record_size=message_size, throughput=-1, version=client_version,
|
||||
settings={
|
||||
'acks': acks,
|
||||
'compression.type': compression_type,
|
||||
'batch.size': self.batch_size,
|
||||
'buffer.memory': self.buffer_memory})
|
||||
self.producer.run()
|
||||
return compute_aggregate_throughput(self.producer)
|
||||
|
||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
|
||||
def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None,
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
||||
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
|
||||
interbroker_security_protocol=None, client_version=str(TRUNK),
|
||||
broker_version=str(TRUNK)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
|
||||
|
@ -117,7 +120,12 @@ class Benchmark(Test):
|
|||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
|
||||
throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
|
||||
throughput=-1, version=client_version, settings={
|
||||
'acks': 1,
|
||||
'compression.type': compression_type,
|
||||
'batch.size': self.batch_size,
|
||||
'buffer.memory': self.buffer_memory
|
||||
},
|
||||
intermediate_stats=True
|
||||
)
|
||||
self.producer.run()
|
||||
|
@ -146,9 +154,10 @@ class Benchmark(Test):
|
|||
return data
|
||||
|
||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
|
||||
def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None,
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
|
||||
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
|
||||
interbroker_security_protocol=None, client_version=str(TRUNK),
|
||||
broker_version=str(TRUNK)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
|
||||
|
@ -167,15 +176,17 @@ class Benchmark(Test):
|
|||
self.logger.info("BENCHMARK: End to end latency")
|
||||
self.perf = EndToEndLatencyService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=TOPIC_REP_THREE, num_records=10000, version=client_version
|
||||
topic=TOPIC_REP_THREE, num_records=10000,
|
||||
compression_type=compression_type, version=client_version
|
||||
)
|
||||
self.perf.run()
|
||||
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
|
||||
|
||||
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
|
||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
|
||||
def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True,
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
||||
def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT",
|
||||
interbroker_security_protocol=None, new_consumer=True,
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
|
@ -198,7 +209,12 @@ class Benchmark(Test):
|
|||
self.test_context, 1, self.kafka,
|
||||
topic=TOPIC_REP_THREE,
|
||||
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
|
||||
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
|
||||
settings={
|
||||
'acks': 1,
|
||||
'compression.type': compression_type,
|
||||
'batch.size': self.batch_size,
|
||||
'buffer.memory': self.buffer_memory
|
||||
}
|
||||
)
|
||||
self.consumer = ConsumerPerformanceService(
|
||||
self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
|
||||
|
@ -216,8 +232,9 @@ class Benchmark(Test):
|
|||
|
||||
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
|
||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
|
||||
def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
||||
def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT",
|
||||
interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
"""
|
||||
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
|
||||
|
@ -236,7 +253,12 @@ class Benchmark(Test):
|
|||
self.test_context, 1, self.kafka,
|
||||
topic=TOPIC_REP_THREE,
|
||||
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
|
||||
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
|
||||
settings={
|
||||
'acks': 1,
|
||||
'compression.type': compression_type,
|
||||
'batch.size': self.batch_size,
|
||||
'buffer.memory': self.buffer_memory
|
||||
}
|
||||
)
|
||||
self.producer.run()
|
||||
|
||||
|
|
|
@ -17,32 +17,53 @@ from kafkatest.services.performance import PerformanceService
|
|||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
|
||||
from kafkatest.services.kafka.directory import kafka_dir
|
||||
from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
|
||||
from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0, V_0_10_0_0
|
||||
|
||||
import os
|
||||
|
||||
class EndToEndLatencyService(PerformanceService):
|
||||
MESSAGE_BYTES = 21 # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions
|
||||
|
||||
# Root directory for persistent output
|
||||
PERSISTENT_ROOT = "/mnt/end_to_end_latency"
|
||||
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
|
||||
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "end_to_end_latency.stdout")
|
||||
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "end_to_end_latency.stderr")
|
||||
LOG_FILE = os.path.join(LOG_DIR, "end_to_end_latency.log")
|
||||
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
||||
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "client.properties")
|
||||
|
||||
logs = {
|
||||
"end_to_end_latency_log": {
|
||||
"path": "/mnt/end-to-end-latency.log",
|
||||
"end_to_end_latency_output": {
|
||||
"path": STDOUT_CAPTURE,
|
||||
"collect_default": True},
|
||||
"end_to_end_latency_stderr": {
|
||||
"path": STDERR_CAPTURE,
|
||||
"collect_default": True},
|
||||
"end_to_end_latency_log": {
|
||||
"path": LOG_FILE,
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, version=TRUNK, consumer_fetch_max_wait=100, acks=1):
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=TRUNK, acks=1):
|
||||
super(EndToEndLatencyService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.security_config = kafka.security_config.client_config()
|
||||
|
||||
security_protocol = self.security_config.security_protocol
|
||||
assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
|
||||
|
||||
if version < V_0_9_0_0:
|
||||
assert security_protocol == SecurityConfig.PLAINTEXT, \
|
||||
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
|
||||
assert compression_type == "none", \
|
||||
"Compression type %s is only supported if version >= 0.9.0.0, version %s" % (compression_type, str(version))
|
||||
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
'num_records': num_records,
|
||||
'consumer_fetch_max_wait': consumer_fetch_max_wait,
|
||||
'acks': acks,
|
||||
'compression_type': compression_type,
|
||||
'kafka_opts': self.security_config.kafka_opts,
|
||||
'message_bytes': EndToEndLatencyService.MESSAGE_BYTES
|
||||
}
|
||||
|
@ -50,56 +71,41 @@ class EndToEndLatencyService(PerformanceService):
|
|||
for node in self.nodes:
|
||||
node.version = version
|
||||
|
||||
@property
|
||||
def security_config_file(self):
|
||||
if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
|
||||
security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties"
|
||||
else:
|
||||
security_config_file = ""
|
||||
return security_config_file
|
||||
|
||||
def start_cmd(self, node):
|
||||
args = self.args.copy()
|
||||
args.update({
|
||||
'zk_connect': self.kafka.zk.connect_setting(),
|
||||
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
|
||||
'security_config_file': self.security_config_file,
|
||||
'config_file': EndToEndLatencyService.CONFIG_FILE,
|
||||
'kafka_dir': kafka_dir(node)
|
||||
})
|
||||
|
||||
cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
|
||||
if node.version >= V_0_9_0_0:
|
||||
"""
|
||||
val brokerList = args(0)
|
||||
val topic = args(1)
|
||||
val numMessages = args(2).toInt
|
||||
val producerAcks = args(3)
|
||||
val messageLen = args(4).toInt
|
||||
"""
|
||||
|
||||
cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
|
||||
cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(security_config_file)s" % args
|
||||
cmd += "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
|
||||
cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
|
||||
else:
|
||||
"""
|
||||
val brokerList = args(0)
|
||||
val zkConnect = args(1)
|
||||
val topic = args(2)
|
||||
val numMessages = args(3).toInt
|
||||
val consumerFetchMaxWait = args(4).toInt
|
||||
val producerAcks = args(5).toInt
|
||||
"""
|
||||
|
||||
# Set fetch max wait to 0 to match behavior in later versions
|
||||
cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args
|
||||
cmd += "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args
|
||||
cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args
|
||||
|
||||
cmd += " | tee /mnt/end-to-end-latency.log"
|
||||
cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': EndToEndLatencyService.STDOUT_CAPTURE,
|
||||
'stderr': EndToEndLatencyService.STDERR_CAPTURE}
|
||||
|
||||
return cmd
|
||||
|
||||
def _worker(self, idx, node):
|
||||
node.account.ssh("mkdir -p %s" % EndToEndLatencyService.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
log_config = self.render('tools_log4j.properties', log_file=EndToEndLatencyService.LOG_FILE)
|
||||
|
||||
node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config)
|
||||
client_config = str(self.security_config)
|
||||
if node.version >= V_0_9_0_0:
|
||||
client_config += "compression_type=%(compression_type)s" % self.args
|
||||
node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config)
|
||||
|
||||
self.security_config.setup_node(node)
|
||||
if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
|
||||
node.account.create_file(self.security_config_file, str(self.security_config))
|
||||
|
||||
cmd = self.start_cmd(node)
|
||||
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
|
||||
|
|
|
@ -17,6 +17,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.store;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
|
@ -59,8 +60,10 @@ public class ProducerPerformance {
|
|||
|
||||
/* setup perf test */
|
||||
byte[] payload = new byte[recordSize];
|
||||
Arrays.fill(payload, (byte) 1);
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
|
||||
Random random = new Random(0);
|
||||
for (int i = 0; i < payload.length; ++i)
|
||||
payload[i] = (byte) (random.nextInt(26) + 65);
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, payload);
|
||||
Stats stats = new Stats(numRecords, 5000);
|
||||
long startMs = System.currentTimeMillis();
|
||||
|
||||
|
|
Loading…
Reference in New Issue