MINOR: Fix throttle usage in reassignment test case (#7822)

The replication throttle in `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress` was not setting the quota on the partition correctly, so the test case was not working as expected. This patch fixes the problem and also fixes a bug in `waitForTopicCreated` which caused the function to always wait for the full timeout.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2019-12-17 08:20:25 -08:00 committed by GitHub
parent b50d925e07
commit fd7991ae23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 61 deletions

View File

@ -668,9 +668,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val sameMoveTp = new TopicPartition("orders", 2)
// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq("orders"), throttleSettingForSeconds(10), Map(
sameMoveTp -> Seq(0, 1, 2)
))
throttle(Seq("orders"), throttleSettingForSeconds(10), Set(sameMoveTp))
servers.foreach(_.shutdown())
adminClient.close()
@ -758,8 +756,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
produceMessages(tp0.topic, 500, acks = -1, valueLength = 100 * 1000)
TestUtils.throttleAllBrokersReplication(adminClient, Seq(101), throttleBytes = 1)
TestUtils.assignThrottledPartitionReplicas(adminClient, Map(tp0 -> Seq(101)))
TestUtils.setReplicationThrottleForPartitions(adminClient, Seq(101), Set(tp0), throttleBytes = 1)
adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(100, 101))).asJava
@ -778,8 +775,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
assertTrue(isAssignmentInProgress(tp0))
TestUtils.resetBrokersThrottle(adminClient, Seq(101))
TestUtils.removePartitionReplicaThrottles(adminClient, Set(tp0))
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp0))
waitForAllReassignmentsToComplete()
assertEquals(Seq(100, 101), zkClient.getReplicasForPartition(tp0))
@ -802,10 +798,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
assertTrue(adminClient.listPartitionReassignments().reassignments().get().isEmpty)
// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101),
tp2 -> Seq(100, 101)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp2))
adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(101)),
reassignmentEntry(tp2, Seq(101))).asJava
@ -832,10 +826,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)
// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101, 102),
tp1 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1))
adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(100, 101, 102)),
@ -880,10 +871,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
// Throttle to avoid race conditions
val throttleSetting = throttleSettingForSeconds(10)
throttle(Seq(topicName), throttleSetting, Map(
tp0 -> Seq(100, 101, 102),
tp1 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSetting, Set(tp0, tp1))
// API reassignment to 101 for both partitions
adminClient.alterPartitionReassignments(
@ -938,11 +926,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)
// Throttle to avoid race conditions
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Map(
tpA0 -> Seq(100, 101, 102),
tpA1 -> Seq(100, 101, 102),
tpB0 -> Seq(100, 101, 102)
))
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Set(tpA0, tpA1, tpB0))
// 1. znode reassignment to 101 for TP A-0, A-1
val topicJson = executeAssignmentJson(Seq(
@ -985,9 +969,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
adminClient = createAdminClient(servers)
createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100, 101)), servers = servers)
// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0))
// move to [102, 101]
adminClient.alterPartitionReassignments(
@ -1016,10 +998,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)
// Throttle to avoid race conditions
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101),
tp1 -> Seq(100, 101)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1))
val move = Map(
tp0 -> Seq(101),
@ -1060,12 +1039,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)
// Throttle to avoid race conditions
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101, 102),
tp1 -> Seq(100, 101, 102),
tp2 -> Seq(100, 101, 102),
tp3 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1, tp2, tp3))
// API reassignment to 101 for tp0 and tp1
adminClient.alterPartitionReassignments(
@ -1115,11 +1089,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)
// Throttle to avoid race conditions
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Map(
tpA0 -> Seq(100, 101, 102),
tpA1 -> Seq(100, 101, 102),
tpB0 -> Seq(100, 101, 102)
))
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Set(tpA0, tpA1, tpB0))
adminClient.alterPartitionReassignments(Map(reassignmentEntry(tpA0, Seq(101))).asJava).all().get()
val apiReassignmentsInProgress1 = adminClient.listPartitionReassignments().reassignments().get()
@ -1180,10 +1150,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)
// Throttle to avoid race conditions
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101),
tp1 -> Seq(100, 101)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1))
// Alter `topicName` partition reassignment
adminClient.alterPartitionReassignments(
@ -1308,13 +1275,12 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
ThrottleSetting(throttle.toString, messagesPerSecond * secondsDuration, messageSize)
}
def throttle(topics: Seq[String], throttle: ThrottleSetting, replicasToThrottle: Map[TopicPartition, Seq[Int]]): Unit = {
def throttle(topics: Seq[String], throttle: ThrottleSetting, partitions: Set[TopicPartition]): Unit = {
val messagesPerTopic = throttle.numMessages / topics.size
for (topic <- topics) {
produceMessages(topic, numMessages = messagesPerTopic, acks = 0, valueLength = throttle.messageSizeBytes)
}
TestUtils.throttleAllBrokersReplication(adminClient, brokerIds, throttle.throttleBytes.toInt)
TestUtils.assignThrottledPartitionReplicas(adminClient, replicasToThrottle)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, partitions, throttle.throttleBytes.toInt)
}
private def produceMessages(topic: String, numMessages: Int, acks: Int, valueLength: Int): Unit = {

View File

@ -26,6 +26,7 @@ import kafka.utils.{Exit, Logging, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
import org.apache.kafka.common.internals.Topic
@ -93,7 +94,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = {
val finishTime = System.currentTimeMillis() + timeout
var result = false
while (System.currentTimeMillis() < finishTime || !result) {
while (System.currentTimeMillis() < finishTime && !result) {
val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get()
result = topics.contains(topicName)
Thread.sleep(100)
@ -655,31 +656,29 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
val configMap = new java.util.HashMap[String, String]()
val replicationFactor: Short = 1
val partitions = 1
val tp = new TopicPartition(testTopicName, 0)
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get()
waitForTopicCreated(testTopicName)
for (msg <- 0 to 10) {
TestUtils.produceMessage(servers, testTopicName, s"$msg")
}
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
val brokerIds = servers.map(_.config.brokerId)
TestUtils.throttleAllBrokersReplication(adminClient, brokerIds, throttleBytes = 1)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)
val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).all().get().get(testTopicName)
val firstPartition = testTopicDesc.partitions().asScala.head
val firstTopicPartition = new TopicPartition(testTopicName, firstPartition.partition())
val replicasOfFirstPartition = firstPartition.replicas().asScala.map(_.id())
val targetReplica = brokerIds.diff(replicasOfFirstPartition).head
adminClient.alterPartitionReassignments(Collections.singletonMap(firstTopicPartition,
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica)))))
// let's wait until the LAIR is propagated
TestUtils.waitUntilTrue(() => {
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(firstTopicPartition)).reassignments().get()
!reassignments.get(firstTopicPartition).addingReplicas().isEmpty
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
!reassignments.get(tp).addingReplicas().isEmpty
}, "Reassignment didn't add the second node")
// describe the topic and test if it's under-replicated
@ -693,7 +692,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
assertEquals("--under-replicated-partitions shouldn't return anything", "", underReplicatedOutput)
TestUtils.resetBrokersThrottle(adminClient, brokerIds)
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp))
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}
@ -793,4 +792,5 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
assertTrue(output.contains(testTopicName))
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}
}

View File

@ -1631,6 +1631,29 @@ object TestUtils extends Logging {
}
}
/**
* Set broker replication quotas and enable throttling for a set of partitions. This
* will override any previous replication quotas, but will leave the throttling status
* of other partitions unaffected.
*/
def setReplicationThrottleForPartitions(admin: Admin,
brokerIds: Seq[Int],
partitions: Set[TopicPartition],
throttleBytes: Int): Unit = {
throttleAllBrokersReplication(admin, brokerIds, throttleBytes)
assignThrottledPartitionReplicas(admin, partitions.map(_ -> brokerIds).toMap)
}
/**
* Remove a set of throttled partitions and reset the overall replication quota.
*/
def removeReplicationThrottleForPartitions(admin: Admin,
brokerIds: Seq[Int],
partitions: Set[TopicPartition]): Unit = {
removePartitionReplicaThrottles(admin, partitions)
resetBrokersThrottle(admin, brokerIds)
}
/**
* Throttles all replication across the cluster.
* @param adminClient is the adminClient to use for making connection with the cluster