From 044b56b8b5106f5115de7054ac95e199f1b621ea Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 10 Jan 2017 00:42:41 -0800 Subject: [PATCH] MINOR: Add Replication Quotas Test Rig This test rig lives in the other.kafka package so isn't part of our standard tests. It provides a convenient mechanism for measuring throttling performance over time. Measurements for each experiment are charted and presented to the user in an html file. The output looks like this: **Experiment4** - BrokerCount: 25 - PartitionCount: 100 - Throttle: 4,000,000 B/s - MsgCount: 1,000 - MsgSize: 100,000 - TargetBytesPerBrokerMB: 400 ![image](https://cloud.githubusercontent.com/assets/1297498/19070450/3251bc52-8a23-11e6-88fe-94de6b9147c2.png) ![image](https://cloud.githubusercontent.com/assets/1297498/19070467/4c19f38e-8a23-11e6-986a-ba19d16819ca.png) Author: Ben Stopford Reviewers: Ewen Cheslack-Postava Closes #1957 from benstopford/throttling-test-rig --- build.gradle | 1 + .../kafka/ReplicationQuotasTestRig.scala | 334 ++++++++++++++++++ .../scala/unit/kafka/utils/TestUtils.scala | 4 +- gradle/dependencies.gradle | 4 +- 4 files changed, 340 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala diff --git a/build.gradle b/build.gradle index 87ffb08f3cd..dba33766bb6 100644 --- a/build.gradle +++ b/build.gradle @@ -416,6 +416,7 @@ project(':core') { testCompile libs.apachedsJdbmPartition testCompile libs.junit testCompile libs.scalaTest + testCompile libs.jfreechart scoverage libs.scoveragePlugin scoverage libs.scoverageRuntime diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala new file mode 100644 index 00000000000..cb49162226a --- /dev/null +++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package other.kafka + +import java.io.{File, FileOutputStream, PrintWriter} +import javax.imageio.ImageIO + +import kafka.admin.ReassignPartitionsCommand +import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition +import kafka.server.{KafkaConfig, KafkaServer, QuotaType} +import kafka.utils.TestUtils._ +import kafka.utils.ZkUtils._ +import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.clients.producer.ProducerRecord +import org.jfree.chart.plot.PlotOrientation +import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart} +import org.jfree.data.xy.{XYSeries, XYSeriesCollection} + +import scala.collection.JavaConverters._ +import scala.collection.{Map, Seq, mutable} + +/** + * Test rig for measuring throttling performance. Configure the parameters for a set of experiments, then execute them + * and view the html output file, with charts, that are produced. You can also render the charts to the screen if + * you wish. + * + * Currently you'll need about 40GB of disk space to run these experiments (largest data written x2). Tune the msgSize + * & #partitions and throttle to adjust this. + */ +object ReplicationQuotasTestRig { + new File("Experiments").mkdir() + private val dir = "Experiments/Run" + System.currentTimeMillis().toString.substring(8) + new File(dir).mkdir() + val k = 1000 * 1000 + + + def main(args: Array[String]): Unit = { + val displayChartsOnScreen = if (args.length > 0 && args(0) == "show-gui") true else false + val journal = new Journal() + + val experiments = Seq( + //1GB total data written, will take 210s + new ExperimentDef("Experiment1", brokers = 5, partitions = 20, throttle = 1 * k, msgsPerPartition = 500, msgSize = 100 * 1000), + //5GB total data written, will take 110s + new ExperimentDef("Experiment2", brokers = 5, partitions = 50, throttle = 10 * k, msgsPerPartition = 1000, msgSize = 100 * 1000), + //5GB total data written, will take 110s + new ExperimentDef("Experiment3", brokers = 50, partitions = 50, throttle = 2 * k, msgsPerPartition = 1000, msgSize = 100 * 1000), + //10GB total data written, will take 110s + new ExperimentDef("Experiment4", brokers = 25, partitions = 100, throttle = 4 * k, msgsPerPartition = 1000, msgSize = 100 * 1000), + //10GB total data written, will take 80s + new ExperimentDef("Experiment5", brokers = 5, partitions = 50, throttle = 50 * k, msgsPerPartition = 4000, msgSize = 100 * 1000) + ) + experiments.foreach(run(_, journal, displayChartsOnScreen)) + + if (!displayChartsOnScreen) + System.exit(0) + } + + def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean) { + val experiment = new Experiment() + try { + experiment.setUp + experiment.run(config, journal, displayChartsOnScreen) + journal.footer() + } + catch { + case e: Exception => e.printStackTrace() + } + finally { + experiment.tearDown + } + } + + case class ExperimentDef(name: String, brokers: Int, partitions: Int, throttle: Long, msgsPerPartition: Int, msgSize: Int) { + val targetBytesPerBrokerMB: Long = msgsPerPartition.toLong * msgSize.toLong * partitions.toLong / brokers.toLong / 1000000 + } + + class Experiment extends ZooKeeperTestHarness with Logging { + val topicName = "my-topic" + var experimentName = "unset" + val partitionId = 0 + var servers: Seq[KafkaServer] = null + val leaderRates = mutable.Map[Int, Array[Double]]() + val followerRates = mutable.Map[Int, Array[Double]]() + + def startBrokers(brokerIds: Seq[Int]) { + println("Starting Brokers") + servers = brokerIds.map(i => createBrokerConfig(i, zkConnect)) + .map(c => createServer(KafkaConfig.fromProps(c))) + } + + override def tearDown() { + servers.par.foreach(_.shutdown()) + servers.par.foreach(server => CoreUtils.delete(server.config.logDirs)) + super.tearDown() + } + + def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean) { + experimentName = config.name + val brokers = (100 to 100 + config.brokers) + var count = 0 + val shift = Math.round(config.brokers / 2) + + def nextReplicaRoundRobin(): Int = { + count = count + 1 + 100 + (count + shift) % config.brokers + } + val replicas = (0 to config.partitions).map(partition => partition -> Seq(nextReplicaRoundRobin())).toMap + + startBrokers(brokers) + createTopic(zkUtils, topicName, replicas, servers) + + println("Writing Data") + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5, acks = 0) + (0 until config.msgsPerPartition).foreach { x => + (0 until config.partitions).foreach { partition => + producer.send(new ProducerRecord(topicName, partition, null, new Array[Byte](config.msgSize))) + } + } + + println("Starting Reassignment") + val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1 + + val start = System.currentTimeMillis() + ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), config.throttle) + + //Await completion + waitForReassignmentToComplete() + println(s"Reassignment took ${(System.currentTimeMillis() - start)/1000}s") + + validateAllOffsetsMatch(config) + + journal.appendToJournal(config) + renderChart(leaderRates, "Leader", journal, displayChartsOnScreen) + renderChart(followerRates, "Follower", journal, displayChartsOnScreen) + logOutput(config, replicas, newAssignment) + + println("Output can be found here: " + journal.path()) + } + + def validateAllOffsetsMatch(config: ExperimentDef): Unit = { + //Validate that offsets are correct in all brokers + for (broker <- servers) { + (0 until config.partitions).foreach { partitionId => + val offset = broker.getLogManager.getLog(new TopicPartition(topicName, partitionId)).map(_.logEndOffset).getOrElse(-1L) + if (offset >= 0 && offset != config.msgsPerPartition) { + throw new RuntimeException(s"Run failed as offsets did not match for partition $partitionId on broker ${broker.config.brokerId}. Expected ${config.msgsPerPartition} but was $offset.") + } + } + } + } + + def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicAndPartition, Seq[Int]]): Unit = { + val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) + val existing = zkUtils.getReplicaAssignmentForTopics(newAssignment.map(_._1.topic).toSeq) + + //Long stats + println("The replicas are " + replicas.toSeq.sortBy(_._1).map("\n" + _)) + println("This is the current replica assignment:\n" + actual.toSeq) + println("proposed assignment is: \n" + newAssignment) + println("This is the assigment we eneded up with" + actual) + + //Test Stats + println(s"numBrokers: ${config.brokers}") + println(s"numPartitions: ${config.partitions}") + println(s"throttle: ${config.throttle}") + println(s"numMessagesPerPartition: ${config.msgsPerPartition}") + println(s"msgSize: ${config.msgSize}") + println(s"We will write ${config.targetBytesPerBrokerMB}MB of data per broker") + println(s"Worst case duration is ${config.targetBytesPerBrokerMB * 1000 * 1000/ config.throttle}") + } + + private def waitForOffsetsToMatch(offset: Int, partitionId: Int, broker: KafkaServer, topic: String): Boolean = waitUntilTrue(() => { + offset == broker.getLogManager.getLog(new TopicPartition(topic, partitionId)) + .map(_.logEndOffset).getOrElse(0) + }, s"Offsets did not match for partition $partitionId on broker ${broker.config.brokerId}", 60000) + + def waitForReassignmentToComplete() { + waitUntilTrue(() => { + printRateMetrics() + !zkUtils.pathExists(ReassignPartitionsPath) + }, s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted", 60 * 60 * 1000, pause = 1000L) + } + + def renderChart(data: mutable.Map[Int, Array[Double]], name: String, journal: Journal, displayChartsOnScreen: Boolean): Unit = { + val dataset = addDataToChart(data) + val chart = createChart(name, dataset) + + writeToFile(name, journal, chart) + maybeDisplayOnScreen(displayChartsOnScreen, chart) + println(s"Chart generated for $name") + } + + def maybeDisplayOnScreen(displayChartsOnScreen: Boolean, chart: JFreeChart): Unit = { + if (displayChartsOnScreen) { + val frame = new ChartFrame(experimentName, chart) + frame.pack() + frame.setVisible(true) + } + } + + def writeToFile(name: String, journal: Journal, chart: JFreeChart): Unit = { + val file = new File(dir, experimentName + "-" + name + ".png") + ImageIO.write(chart.createBufferedImage(1000, 700), "png", file) + journal.appendChart(file.getAbsolutePath, name.eq("Leader")) + } + + def createChart(name: String, dataset: XYSeriesCollection): JFreeChart = { + val chart: JFreeChart = ChartFactory.createXYLineChart( + experimentName + " - " + name + " Throttling Performance", + "Time (s)", + "Throttle Throughput (B/s)", + dataset + , PlotOrientation.VERTICAL, false, true, false + ) + chart + } + + def addDataToChart(data: mutable.Map[Int, Array[Double]]): XYSeriesCollection = { + val dataset = new XYSeriesCollection + data.foreach { case (broker, values) => + val series = new XYSeries("Broker:" + broker) + var x = 0 + values.foreach { value => + series.add(x, value) + x += 1 + } + dataset.addSeries(series) + } + dataset + } + + def record(rates: mutable.Map[Int, Array[Double]], brokerId: Int, currentRate: Double) = { + var leaderRatesBroker: Array[Double] = rates.getOrElse(brokerId, Array[Double]()) + leaderRatesBroker = leaderRatesBroker ++ Array(currentRate) + rates.put(brokerId, leaderRatesBroker) + } + + def printRateMetrics() { + for (broker <- servers) { + val leaderRate: Double = measuredRate(broker, QuotaType.LeaderReplication) + if (broker.config.brokerId == 100) + info("waiting... Leader rate on 101 is " + leaderRate) + record(leaderRates, broker.config.brokerId, leaderRate) + if (leaderRate > 0) + trace("Leader Rate on " + broker.config.brokerId + " is " + leaderRate) + + val followerRate: Double = measuredRate(broker, QuotaType.FollowerReplication) + record(followerRates, broker.config.brokerId, followerRate) + if (followerRate > 0) + trace("Follower Rate on " + broker.config.brokerId + " is " + followerRate) + } + } + + private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = { + val metricName = broker.metrics.metricName("byte-rate", repType.toString) + if (broker.metrics.metrics.asScala.contains(metricName)) + broker.metrics.metrics.asScala(metricName).value + else -1 + } + + def json(topic: String*): String = { + val topicStr = topic.map { + t => "{\"topic\": \"" + t + "\"}" + }.mkString(",") + s"""{"topics": [$topicStr],"version":1}""" + } + } + + class Journal { + private val log = new File(dir, "Log.html") + header() + + def appendToJournal(config: ExperimentDef): Unit = { + val message = s"\n\n

${config.name}

" + + s"

- BrokerCount: ${config.brokers}" + + s"

- PartitionCount: ${config.partitions}" + + f"

- Throttle: ${config.throttle}%,.0f MB/s" + + f"

- MsgCount: ${config.msgsPerPartition}%,.0f " + + f"

- MsgSize: ${config.msgSize}%,.0f" + + s"

- TargetBytesPerBrokerMB: ${config.targetBytesPerBrokerMB}

" + append(message) + } + + def appendChart(path: String, first: Boolean): Unit = { + val message = new StringBuilder + if (first) + message.append("

") + message.append("\"Chart\"") + if (!first) + message.append("

") + append(message.toString()) + } + + def header(): Unit = { + append("

Replication Quotas Test Rig

") + } + + def footer(): Unit = { + append("") + } + + def append(message: String): Unit = { + val stream = new FileOutputStream(log, true) + new PrintWriter(stream) { + append(message) + close + } + } + + def path(): String = { + log.getAbsolutePath + } + } + +} + diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 06d88eed3be..24ec1c2eb7f 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -747,14 +747,14 @@ object TestUtils extends Logging { /** * Wait until the given condition is true or throw an exception if the given wait time elapses. */ - def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Boolean = { + def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = { val startTime = System.currentTimeMillis() while (true) { if (condition()) return true if (System.currentTimeMillis() > startTime + waitTime) fail(msg) - Thread.sleep(waitTime.min(100L)) + Thread.sleep(waitTime.min(pause)) } // should never hit here throw new RuntimeException("unexpected error") diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 1ad63460347..b5fdb951a6f 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -46,6 +46,7 @@ versions += [ snappy: "1.1.2.6", zkclient: "0.10", zookeeper: "3.4.9", + jfreechart: "1.0.0", ] // Add Scala version @@ -107,5 +108,6 @@ libs += [ slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j", snappy: "org.xerial.snappy:snappy-java:$versions.snappy", zkclient: "com.101tec:zkclient:$versions.zkclient", - zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper" + zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", + jfreechart: "jfreechart:jfreechart:$versions.jfreechart" ]