mirror of https://github.com/apache/kafka.git
KAFKA-6331; Fix transient failure in AdminClientIntegrationTest.testAlterReplicaLogDirs
Author: Dong Lin <lindong28@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #4306 from lindong28/KAFKA-6331
This commit is contained in:
parent
82c6d429e7
commit
cdb3955452
|
|
@ -327,7 +327,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
|
||||||
TestUtils.waitUntilTrue(() => {
|
TestUtils.waitUntilTrue(() => {
|
||||||
val logDir = server.logManager.getLog(tp).get.dir.getParent
|
val logDir = server.logManager.getLog(tp).get.dir.getParent
|
||||||
secondReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
|
secondReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
|
||||||
}, "timed out waiting for replica movement", 6000L)
|
}, "timed out waiting for replica movement")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that replica can be moved to the specified log directory while the producer is sending messages
|
// Verify that replica can be moved to the specified log directory while the producer is sending messages
|
||||||
|
|
@ -354,14 +354,18 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
TestUtils.waitUntilTrue(() => numMessages.get > 100, "timed out waiting for message produce", 6000L)
|
TestUtils.waitUntilTrue(() => numMessages.get > 10, s"only $numMessages messages are produced before timeout. Producer future ${producerFuture.value}")
|
||||||
client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
|
client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
|
||||||
servers.foreach { server =>
|
servers.foreach { server =>
|
||||||
TestUtils.waitUntilTrue(() => {
|
TestUtils.waitUntilTrue(() => {
|
||||||
val logDir = server.logManager.getLog(tp).get.dir.getParent
|
val logDir = server.logManager.getLog(tp).get.dir.getParent
|
||||||
firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
|
firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
|
||||||
}, "timed out waiting for replica movement", 6000L)
|
}, s"timed out waiting for replica movement. Producer future ${producerFuture.value}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val currentMessagesNum = numMessages.get
|
||||||
|
TestUtils.waitUntilTrue(() => numMessages.get - currentMessagesNum > 10,
|
||||||
|
s"only ${numMessages.get - currentMessagesNum} messages are produced within timeout after replica movement. Producer future ${producerFuture.value}")
|
||||||
} finally running.set(false)
|
} finally running.set(false)
|
||||||
|
|
||||||
val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
|
val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue