KAFKA-4266; ReassignPartitionsClusterTest: ensure ZK publication is completed before start

Increase the reliability of the one temporal comparison in ReassignPartitionsClusterTest by imposing a delay after ZK is updated. This should be more reliable than just increasing the amount of data.

This relates to a previous PR: https://github.com/apache/kafka/pull/1982

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1997 from benstopford/KAFKA-4266
This commit is contained in:
Ben Stopford 2017-03-06 12:42:43 +00:00 committed by Ismael Juma
parent 79f85039d7
commit 63010cbfe5
5 changed files with 60 additions and 43 deletions

View File

@ -17,8 +17,9 @@
package kafka.admin
import joptsimple.OptionParser
import kafka.server.{DynamicConfig, ConfigType}
import kafka.server.{ConfigType, DynamicConfig}
import kafka.utils._
import scala.collection._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
@ -29,6 +30,10 @@ import org.apache.kafka.common.security.JaasUtils
object ReassignPartitionsCommand extends Logging {
case class Throttle(value: Long, postUpdateAction: () => Unit = () => ())
private[admin] val NoThrottle = Throttle(-1)
def main(args: Array[String]): Unit = {
val opts = validateAndParseArgs(args)
@ -146,10 +151,10 @@ object ReassignPartitionsCommand extends Logging {
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
executeAssignment(zkUtils, reassignmentJsonString, throttle)
executeAssignment(zkUtils, reassignmentJsonString, Throttle(throttle))
}
def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Throttle) {
val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
@ -160,7 +165,7 @@ object ReassignPartitionsCommand extends Logging {
}
else {
printCurrentAssignment(zkUtils, partitionsToBeReassigned)
if (throttle >= 0)
if (throttle.value >= 0)
println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
if (reassignPartitionsCommand.reassignPartitions(throttle)) {
println("Successfully started reassignment of partitions.")
@ -178,7 +183,7 @@ object ReassignPartitionsCommand extends Logging {
def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): Seq[(TopicAndPartition, Seq[Int])] = {
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file is empty")
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
@ -306,15 +311,19 @@ object ReassignPartitionsCommand extends Logging {
class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition, Seq[Int]], admin: AdminUtilities = AdminUtils)
extends Logging {
import ReassignPartitionsCommand._
def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
val proposedTopics = proposedAssignment.keySet.map(_.topic).toSeq
zkUtils.getReplicaAssignmentForTopics(proposedTopics)
}
private def maybeThrottle(throttle: Long): Unit = {
if (throttle >= 0) {
maybeLimit(throttle)
private def maybeThrottle(throttle: Throttle): Unit = {
if (throttle.value >= 0) {
assignThrottledReplicas(existingAssignment(), proposedAssignment)
maybeLimit(throttle)
throttle.postUpdateAction()
println(s"The throttle limit was set to ${throttle.value} B/s")
}
}
@ -322,19 +331,18 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
* Limit the throttle on currently moving replicas. Note that this command can use used to alter the throttle, but
* it may not alter all limits originally set, if some of the brokers have completed their rebalance.
*/
def maybeLimit(throttle: Long) {
if (throttle >= 0) {
def maybeLimit(throttle: Throttle) {
if (throttle.value >= 0) {
val existingBrokers = existingAssignment().values.flatten.toSeq
val proposedBrokers = proposedAssignment.values.flatten.toSeq
val brokers = (existingBrokers ++ proposedBrokers).distinct
for (id <- brokers) {
val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString)
configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.value.toString)
configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.value.toString)
admin.changeBrokerConfig(zkUtils, Seq(id), configs)
}
println(s"The throttle limit was set to $throttle B/s")
}
}
@ -384,7 +392,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
allProposed.filter { case (tp, _) => tp.topic == topic })
}
def reassignPartitions(throttle: Long = -1): Boolean = {
def reassignPartitions(throttle: Throttle = NoThrottle): Boolean = {
maybeThrottle(throttle)
try {
val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
@ -422,6 +430,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
}
}
sealed trait ReassignmentStatus { def status: Int }
case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }

View File

@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, PrintWriter}
import javax.imageio.ImageIO
import kafka.admin.ReassignPartitionsCommand
import kafka.admin.ReassignPartitionsCommand.Throttle
import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition
import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
@ -139,7 +140,7 @@ object ReplicationQuotasTestRig {
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
val start = System.currentTimeMillis()
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), config.throttle)
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
//Await completion
waitForReassignmentToComplete()

View File

@ -12,6 +12,7 @@
*/
package kafka.admin
import kafka.admin.ReassignPartitionsCommand._
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
@ -21,14 +22,15 @@ import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Before, Test}
import kafka.admin.ReplicationQuotaUtils._
import scala.collection.{Map, Seq}
import scala.collection.Map
import scala.collection.Seq
class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val partitionId = 0
var servers: Seq[KafkaServer] = null
val topicName = "my-topic"
val delayMs = 1000
def zkUpdateDelay = {Thread.sleep(delayMs)}
@Before
override def setUp() {
@ -55,7 +57,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
//When we move the replica on 100 to broker 101
ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}"""
ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
waitForReassignmentToComplete()
//Then the replica should be on 101
@ -74,8 +77,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
), servers = servers)
//When rebalancing
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
waitForReassignmentToComplete()
//Then the replicas should span all three brokers
@ -95,8 +98,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
), servers = servers)
//When rebalancing
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
val newAssignment = generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
waitForReassignmentToComplete()
//Then replicas should only span the first two brokers
@ -127,7 +130,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
)
//When rebalancing
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed))
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed), NoThrottle)
waitForReassignmentToComplete()
//Then the proposed changes should have been made
@ -142,6 +145,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
@Test
def shouldExecuteThrottledReassignment() {
//Given partitions on 3 of 3 brokers
val brokers = Array(100, 101, 102)
startBrokers(brokers)
@ -150,33 +154,35 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
), servers = servers)
//Given throttle set so replication will take a certain number of secs
val initialThrottle: Long = 10 * 1000 * 1000
val initialThrottle = Throttle(10 * 1000 * 1000, () => zkUpdateDelay)
val expectedDurationSecs = 5
val numMessages: Int = 500
val msgSize: Int = 100 * 1000
produceMessages(servers, topicName, numMessages, acks = 0, msgSize)
assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle)
assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle.value)
//Start rebalance which will move replica on 100 -> replica on 102
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
val start = System.currentTimeMillis()
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
//Check throttle config. Should be throttling replica 0 on 100 and 102 only.
checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
checkThrottleConfigAddedToZK(initialThrottle.value, servers, topicName, "0:100,0:101", "0:102")
//Await completion
waitForReassignmentToComplete()
val took = System.currentTimeMillis() - start
val took = System.currentTimeMillis() - start - delayMs
//Check move occurred
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
assertEquals(Seq(101, 102), actual.values.flatten.toSeq.distinct.sorted)
//Then command should have taken longer than the throttle rate
assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took", took > expectedDurationSecs * 0.9 * 1000)
assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took", took < expectedDurationSecs * 2 * 1000)
assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took",
took > expectedDurationSecs * 0.9 * 1000)
assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took",
took < expectedDurationSecs * 2 * 1000)
}
@ -211,7 +217,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
TopicAndPartition("topic2", 2) -> Seq(103, 104) //didn't move
)
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), throttle)
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(throttle))
//Check throttle config. Should be throttling specific replicas for each topic.
checkThrottleConfigAddedToZK(throttle, servers, "topic1",
@ -238,15 +244,15 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
//Start rebalance
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(initialThrottle))
//Check throttle config
checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
//Ensure that running Verify, whilst the command is executing, should have no effect
ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
//Check throttle config again
checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
@ -254,7 +260,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//Now re-run the same assignment with a larger throttle, which should only act to increase the throttle and make progress
val newThrottle = initialThrottle * 1000
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), newThrottle)
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(newThrottle))
//Check throttle was changed
checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:101", "0:102")
@ -263,7 +269,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
waitForReassignmentToComplete()
//Verify should remove the throttle
ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
//Check removed
checkThrottleConfigRemovedFromZK(topicName, servers)
@ -280,12 +286,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
//When we execute an assignment that includes an invalid partition (1:101 in this case)
ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}""")
val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}"""
ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
}
@Test
def shouldPerformThrottledReassignmentOverVariousTopics() {
val throttle = 1000L
val throttle = Throttle(1000L)
//Given four brokers
servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf)))

View File

@ -18,6 +18,7 @@ package kafka.admin
import java.util.Properties
import kafka.admin.ReassignPartitionsCommand.Throttle
import kafka.common.TopicAndPartition
import kafka.log.LogConfig
import kafka.log.LogConfig._
@ -248,7 +249,7 @@ class ReassignPartitionsCommandTest extends Logging {
replay(admin)
//When
assigner.maybeLimit(1000)
assigner.maybeLimit(Throttle(1000))
//Then
for (actual <- propsCapture.getValues) {
@ -282,7 +283,7 @@ class ReassignPartitionsCommandTest extends Logging {
replay(admin)
//When
assigner.maybeLimit(1000)
assigner.maybeLimit(Throttle(1000))
//Then
for (actual <- propsCapture.getValues) {
@ -312,7 +313,7 @@ class ReassignPartitionsCommandTest extends Logging {
replay(admin)
//When
assigner.maybeLimit(1000)
assigner.maybeLimit(Throttle(1000))
//Then other property remains
for (actual <- propsCapture.getValues) {

View File

@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
/**