KAFKA-16655: Deflake ZKMigrationIntegrationTest.testDualWrite #15845

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Johnny Hsu <44309740+johnnychhsu@users.noreply.github.com>
This commit is contained in:
Alyssa Huang 2024-05-03 10:44:37 -07:00 committed by GitHub
parent 2c0b8b6920
commit 1fd39150aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 18 additions and 4 deletions

View File

@ -50,9 +50,10 @@ import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.function.Executable
import org.slf4j.{Logger, LoggerFactory}
import java.util
@ -612,7 +613,12 @@ class ZkMigrationIntegrationTest {
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Allocate a block of producer IDs while in ZK mode
val nextProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS)
var nextProducerId = -1L
TestUtils.retry(60000) {
assertDoesNotThrow((() => nextProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(20, TimeUnit.SECONDS)): Executable)
}
assertEquals(0, nextProducerId)
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
@ -645,7 +651,11 @@ class ZkMigrationIntegrationTest {
log.info("Verifying metadata changes with ZK")
verifyTopicConfigs(zkClient)
verifyClientQuotas(zkClient)
val nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS)
var nextKRaftProducerId = -1L
TestUtils.retry(60000) {
assertDoesNotThrow((() => nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(20, TimeUnit.SECONDS)): Executable)
}
assertNotEquals(nextProducerId, nextKRaftProducerId)
} finally {
@ -970,7 +980,11 @@ class ZkMigrationIntegrationTest {
override def onComplete(response: ClientResponse): Unit = {
val body = response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
producerIdStart.complete(body.data().producerIdStart())
if (body.data().errorCode() != 0) {
producerIdStart.completeExceptionally(new RuntimeException(s"Received error code ${body.data().errorCode()}"))
} else {
producerIdStart.complete(body.data().producerIdStart())
}
}
})
producerIdStart