KAFKA-2453: Enable new consumer in EndToEndLatency

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Gwen Shapira, Jason Gustafson

Closes #158 from benstopford/KAFKA-2453b
This commit is contained in:
Ben Stopford 2015-09-08 15:15:51 -07:00 committed by Gwen Shapira
parent f25731265e
commit 9262975661
2 changed files with 95 additions and 40 deletions

View File

@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -19,74 +19,129 @@ package kafka.tools
import java.util.{Arrays, Properties} import java.util.{Arrays, Properties}
import kafka.consumer._ import org.apache.kafka.clients.consumer.{CommitType, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer._
import org.apache.kafka.common.utils.Utils
import scala.Option.option2Iterable import scala.collection.JavaConversions._
/**
* This class records the average end to end latency for a single message to travel through Kafka
*
* broker_list = location of the bootstrap broker for both the producer and the consumer
* num_messages = # messages to send
* producer_acks = See ProducerConfig.ACKS_DOC
* message_size_bytes = size of each message in bytes
*
* e.g. [localhost:9092 test 10000 1 20]
*/
object EndToEndLatency { object EndToEndLatency {
private val timeout: Long = 60000
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length != 6) { println(args.length)
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") if (args.length != 5 && args.length != 6) {
System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] ssl_properties_file")
System.exit(1) System.exit(1)
} }
val brokerList = args(0) val brokerList = args(0)
val zkConnect = args(1) val topic = args(1)
val topic = args(2) val numMessages = args(2).toInt
val numMessages = args(3).toInt val producerAcks = args(3)
val consumerFetchMaxWait = args(4).toInt val messageLen = args(4).toInt
val producerAcks = args(5).toInt val sslPropsFile = if (args.length == 6) args(5) else ""
val consumerProps = new Properties() if (!List("1", "all").contains(producerAcks))
consumerProps.put("group.id", topic) throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")
consumerProps.put("auto.commit.enable", "false")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
consumerProps.put("socket.timeout.ms", 1201000.toString)
val config = new ConsumerConfig(consumerProps) val consumerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
val connector = Consumer.create(config) consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
val iter = stream.iterator consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val producerProps = new Properties() val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
consumer.subscribe(List(topic))
val producerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
// make sure the consumer fetcher has started before sending data since otherwise def finalise() {
// the consumption from the tail will skip the first message and hence be blocked consumer.commit(CommitType.SYNC)
Thread.sleep(5000) producer.close()
consumer.close()
}
//Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when
//a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write.
consumer.seekToEnd()
consumer.poll(0)
val message = "hello there beautiful".getBytes
var totalTime = 0.0 var totalTime = 0.0
val latencies = new Array[Long](numMessages) val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) { for (i <- 0 until numMessages) {
val message = randomBytesOfLen(messageLen)
val begin = System.nanoTime val begin = System.nanoTime
producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
val received = iter.next //Send message (of random bytes) synchronously then immediately poll for it
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
val recordIter = consumer.poll(timeout).iterator
val elapsed = System.nanoTime - begin val elapsed = System.nanoTime - begin
// poor man's progress bar
//Check we got results
if (!recordIter.hasNext) {
finalise()
throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])")
}
//Check result matches the original record
val sent = new String(message)
val read = new String(recordIter.next().value())
if (!read.equals(sent)) {
finalise()
throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")
}
//Check we only got the one message
if (recordIter.hasNext) {
var count = 1
for (elem <- recordIter) count += 1
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
//Report progress
if (i % 1000 == 0) if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0) println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed totalTime += elapsed
latencies(i) = (elapsed / 1000 / 1000) latencies(i) = elapsed / 1000 / 1000
} }
//Results
println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
Arrays.sort(latencies) Arrays.sort(latencies)
val p50 = latencies((latencies.length * 0.5).toInt) val p50 = latencies((latencies.length * 0.5).toInt)
val p99 = latencies((latencies.length * 0.99).toInt) val p99 = latencies((latencies.length * 0.99).toInt)
val p999 = latencies((latencies.length * 0.999).toInt) val p999 = latencies((latencies.length * 0.999).toInt)
println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
producer.close()
connector.commitOffsets(true) finalise()
connector.shutdown() }
System.exit(0)
def randomBytesOfLen(len: Int): Array[Byte] = {
Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte)
} }
} }

View File

@ -127,8 +127,8 @@ class EndToEndLatencyService(PerformanceService):
'bootstrap_servers': self.kafka.bootstrap_servers(), 'bootstrap_servers': self.kafka.bootstrap_servers(),
}) })
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
"%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\ "%(bootstrap_servers)s %(topic)s %(num_records)d "\
"%(consumer_fetch_max_wait)d %(acks)d" % args "%(acks)d 20" % args
self.logger.debug("End-to-end latency %d command: %s", idx, cmd) self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
results = {} results = {}
for line in node.account.ssh_capture(cmd): for line in node.account.ssh_capture(cmd):