KAFKA-7940: Fix flaky test case in `CustomQuotaCallbackTest` (#9777)

This patch attempts to fix `CustomQuotaCallbackTest#testCustomQuotaCallback`. The test creates 99 partitions in a topic, and expects that we can get the partition info for all of them after 15 seconds. If we cannot, then we'll get the error:
```
org.scalatest.exceptions.TestFailedException: Partition [group1_largeTopic,69] metadata not propagated after 15000 ms
```
15 secs is not enough to complete the 99 partitions creation on a slow system. So, we fix it by explicitly wait until we've got the expected partition size before retrieving each partition info and we increase the wait time to 60s for all partition metadata to be propagated.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Luke Chen 2021-02-02 02:16:28 +08:00 committed by GitHub
parent db73d86ea6
commit 7feb55731e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 88 additions and 60 deletions

View File

@ -694,9 +694,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
}.toList.asJava).all().get()
topics.foreach {
case (topicName, parts) =>
parts.indices.foreach {
index => TestUtils.waitUntilMetadataIsPropagated(servers, topicName, index)
}
TestUtils.waitForAllPartitionsMetadata(servers, topicName, parts.size)
}
}

View File

@ -373,8 +373,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
TestUtils.waitForPartitionMetadata(servers, topic, 0)
TestUtils.waitForPartitionMetadata(servers, topic, 1)
// send records to the newly added partition after confirming that metadata have been updated.
val futures1 = (1 to numRecords).map { i =>

View File

@ -83,8 +83,8 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(leader2, leader2FromZk)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
TestUtils.waitForPartitionMetadata(servers, topic1, 1)
TestUtils.waitForPartitionMetadata(servers, topic1, 2)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
@ -116,8 +116,8 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(leader2, leader2FromZk)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
TestUtils.waitForPartitionMetadata(servers, topic2, 1)
TestUtils.waitForPartitionMetadata(servers, topic2, 2)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
@ -137,12 +137,12 @@ class AddPartitionsTest extends BaseRequestTest {
adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
TestUtils.waitForPartitionMetadata(servers, topic3, 1)
TestUtils.waitForPartitionMetadata(servers, topic3, 2)
TestUtils.waitForPartitionMetadata(servers, topic3, 3)
TestUtils.waitForPartitionMetadata(servers, topic3, 4)
TestUtils.waitForPartitionMetadata(servers, topic3, 5)
TestUtils.waitForPartitionMetadata(servers, topic3, 6)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
@ -162,8 +162,8 @@ class AddPartitionsTest extends BaseRequestTest {
adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
TestUtils.waitForPartitionMetadata(servers, topic2, 1)
TestUtils.waitForPartitionMetadata(servers, topic2, 2)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic2).asJava, false).build)

View File

@ -75,7 +75,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
}
private[this] def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = {
TestUtils.waitUntilMetadataIsPropagated(servers, topicName, partition = 0, timeout)
TestUtils.waitForPartitionMetadata(servers, topicName, partition = 0, timeout)
}
@BeforeEach
@ -530,7 +530,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
try {
killBroker(0)
val aliveServers = servers.filterNot(_.config.brokerId == 0)
TestUtils.waitUntilMetadataIsPropagated(aliveServers, testTopicName, 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
val rows = output.split("\n")
@ -552,7 +552,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
try {
killBroker(0)
val aliveServers = servers.filterNot(_.config.brokerId == 0)
TestUtils.waitUntilMetadataIsPropagated(aliveServers, testTopicName, 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split("\n")
@ -672,7 +672,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
try {
killBroker(0)
val aliveServers = servers.filterNot(_.config.brokerId == 0)
TestUtils.waitUntilMetadataIsPropagated(aliveServers, underMinIsrTopic, 0)
TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split("\n")

View File

@ -257,7 +257,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
TestUtils.waitForPartitionMetadata(servers, tp1.topic, tp1.partition)
}
@Test
@ -277,7 +277,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition)
TestUtils.waitForPartitionMetadata(Seq(servers(controllerId)), tp1.topic, tp1.partition)
}
@Test

View File

@ -179,7 +179,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
debug("Follower for " + topic + " is: %s".format(followerId))
produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
waitForPartitionMetadata(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server
@ -218,7 +218,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
debug("Follower for " + topic + " is: %s".format(followerId))
produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
waitForPartitionMetadata(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server
@ -299,7 +299,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
waitForPartitionMetadata(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server

View File

@ -140,7 +140,7 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
verifyMetadata(controllerSocketServer)
if (!request.data.validateOnly) {
// Wait until metadata is propagated and validate non-controller broker has the correct metadata
TestUtils.waitUntilMetadataIsPropagated(servers, topic.name(), 0)
TestUtils.waitForPartitionMetadata(servers, topic.name(), 0)
}
verifyMetadata(notControllerSocketServer)
}
@ -173,7 +173,7 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
}
protected def validateTopicExists(topic: String): Unit = {
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitForPartitionMetadata(servers, topic, 0)
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
assertTrue(metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), "The topic should be created")

View File

@ -207,7 +207,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
else {
// broker epoch in UPDATE_METADATA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic, tp.partition, 10000)
TestUtils.waitForPartitionMetadata(Seq(broker2), tp.topic, tp.partition, 10000)
assertEquals(brokerId2,
broker2.metadataCache.getPartitionInfo(tp.topic, tp.partition).get.leader)
}

View File

@ -112,9 +112,9 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
topicReq("error-timeout-negative", numPartitions = 10, replicationFactor = 3)), timeout = -1),
Map("error-timeout-negative" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// The topics should still get created eventually
TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout", 0)
TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout-zero", 0)
TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout-negative", 0)
TestUtils.waitForPartitionMetadata(servers, "error-timeout", 0)
TestUtils.waitForPartitionMetadata(servers, "error-timeout-zero", 0)
TestUtils.waitForPartitionMetadata(servers, "error-timeout-negative", 0)
validateTopicExists("error-timeout")
validateTopicExists("error-timeout-zero")
validateTopicExists("error-timeout-negative")

View File

@ -131,7 +131,7 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic))
assertEquals(Some(servers.head.config.numPartitions), zkClient.getTopicPartitionCount(autoCreatedTopic))
for (i <- 0 until servers.head.config.numPartitions)
TestUtils.waitUntilMetadataIsPropagated(servers, autoCreatedTopic, i)
TestUtils.waitForPartitionMetadata(servers, autoCreatedTopic, i)
}
val topic1 = "t1"
@ -189,7 +189,7 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals(topic2, topicMetadata2.topic)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
TestUtils.waitForPartitionMetadata(servers, topic1, 0)
// retry the metadata for the first auto created topic
val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, true).build)

View File

@ -101,7 +101,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
server.startup()
// wait for the broker to receive the update metadata request after startup
TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
TestUtils.waitForPartitionMetadata(Seq(server), topic, 0)
producer = createProducer(server)
val consumer = createConsumer(server)

View File

@ -26,6 +26,7 @@ import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener, TopicConfigFetcher}
@ -48,6 +49,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
@ -283,11 +285,11 @@ object TestUtils extends Logging {
val logDirs = (1 to logDirCount).toList.map(i =>
// We would like to allow user to specify both relative path and absolute path as log directory for backward-compatibility reason
// We can verify this by using a mixture of relative path and absolute path as log directories in the test
if (i % 2 == 0) TestUtils.tempDir().getAbsolutePath else TestUtils.tempRelativeDir("data")
if (i % 2 == 0) tempDir().getAbsolutePath else tempRelativeDir("data")
).mkString(",")
props.put(KafkaConfig.LogDirsProp, logDirs)
} else {
props.put(KafkaConfig.LogDirProp, TestUtils.tempDir().getAbsolutePath)
props.put(KafkaConfig.LogDirProp, tempDir().getAbsolutePath)
}
props.put(KafkaConfig.ZkConnectProp, zkConnect)
props.put(KafkaConfig.ZkConnectionTimeoutMsProp, "10000")
@ -349,10 +351,12 @@ object TestUtils extends Logging {
!hasSessionExpirationException},
s"Can't create topic $topic")
// wait until the update metadata request for new topic reaches all servers
// wait until we've propagated all partitions metadata to all servers
val allPartitionsMetadata = waitForAllPartitionsMetadata(servers, topic, numPartitions)
(0 until numPartitions).map { i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse(
throw new IllegalStateException(s"Cannot get the partition leader for topic: $topic, partition: $i in server metadata cache"))
}.toMap
}
@ -391,10 +395,12 @@ object TestUtils extends Logging {
!hasSessionExpirationException},
s"Can't create topic $topic")
// wait until the update metadata request for new topic reaches all servers
// wait until we've propagated all partitions metadata to all servers
val allPartitionsMetadata = waitForAllPartitionsMetadata(servers, topic, partitionReplicaAssignment.size)
partitionReplicaAssignment.keySet.map { i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse(
throw new IllegalStateException(s"Cannot get the partition leader for topic: $topic, partition: $i in server metadata cache"))
}.toMap
}
@ -885,6 +891,32 @@ object TestUtils extends Logging {
), "Timed out waiting for broker metadata to propagate to all servers", timeout)
}
/**
* Wait until the expected number of partitions is in the metadata cache in each broker.
*
* @param servers The list of servers that the metadata should reach to
* @param topic The topic name
* @param expectedNumPartitions The expected number of partitions
* @return all partitions metadata
*/
def waitForAllPartitionsMetadata(servers: Seq[KafkaServer],
topic: String, expectedNumPartitions: Int): Map[TopicPartition, UpdateMetadataPartitionState] = {
waitUntilTrue(
() => servers.forall { server =>
server.metadataCache.numPartitions(topic) match {
case Some(numPartitions) => numPartitions == expectedNumPartitions
case _ => false
}
},
s"Topic [$topic] metadata not propagated after 60000 ms", waitTimeMs = 60000L)
// since the metadata is propagated, we should get the same metadata from each server
(0 until expectedNumPartitions).map { i =>
new TopicPartition(topic, i) -> servers.head.metadataCache.getPartitionInfo(topic, i).getOrElse(
throw new IllegalStateException(s"Cannot get topic: $topic, partition: $i in server metadata cache"))
}.toMap
}
/**
* Wait until a valid leader is propagated to the metadata cache in each broker.
* It assumes that the leader propagated to each broker is the same.
@ -893,28 +925,26 @@ object TestUtils extends Logging {
* @param topic The topic name
* @param partition The partition Id
* @param timeout The amount of time waiting on this condition before assert to fail
* @return The leader of the partition.
* @return The metadata of the partition.
*/
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
var leader: Int = -1
def waitForPartitionMetadata(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): UpdateMetadataPartitionState = {
waitUntilTrue(
() => servers.forall { server =>
server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) if Request.isValidBrokerId(partitionState.leader) =>
leader = partitionState.leader
true
server.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) => Request.isValidBrokerId(partitionState.leader)
case _ => false
}
},
"Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),
waitTimeMs = timeout)
leader
servers.head.metadataCache.getPartitionInfo(topic, partition).getOrElse(
throw new IllegalStateException(s"Cannot get topic: $topic, partition: $partition in server metadata cache"))
}
def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
val (controllerId, _) = TestUtils.computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms"))
}
@ -1200,7 +1230,7 @@ object TestUtils extends Logging {
}
// Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline
TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L)
waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L)
assertTrue(leaderServer.replicaManager.localLog(partition).isEmpty)
}
@ -1448,7 +1478,7 @@ object TestUtils extends Logging {
requestTimeoutMs: Int = 30000,
maxInFlight: Int = 5): KafkaProducer[Array[Byte], Array[Byte]] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(servers))
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
@ -1465,7 +1495,7 @@ object TestUtils extends Logging {
def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Unit = {
val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(servers))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
for (i <- 0 until numRecords) {
@ -1569,7 +1599,7 @@ object TestUtils extends Logging {
val topic = topicPartition.topic
val partition = topicPartition.partition
TestUtils.waitUntilTrue(() => {
waitUntilTrue(() => {
try {
val topicResult = client.describeTopics(Arrays.asList(topic)).all.get.get(topic)
val partitionResult = topicResult.partitions.get(partition)
@ -1581,7 +1611,7 @@ object TestUtils extends Logging {
}
def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {
TestUtils.waitUntilTrue(
waitUntilTrue(
() => {
val description = client.describeTopics(partition.map(_.topic).asJava).all.get.asScala
val isr = description
@ -1597,7 +1627,7 @@ object TestUtils extends Logging {
}
def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = {
TestUtils.waitUntilTrue(
waitUntilTrue(
() => {
val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala
val isr = description
@ -1613,7 +1643,7 @@ object TestUtils extends Logging {
}
def waitForReplicasAssigned(client: Admin, partition: TopicPartition, brokerIds: Seq[Int]): Unit = {
TestUtils.waitUntilTrue(
waitUntilTrue(
() => {
val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala
val replicas = description