diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 33ec365cfc5..1e0e4f4d4fb 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -50,9 +50,10 @@ import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.server.ControllerRequestCompletionHandler import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock} import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ZkConfigs} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail} import org.junit.jupiter.api.{Assumptions, Timeout} import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.function.Executable import org.slf4j.{Logger, LoggerFactory} import java.util @@ -612,7 +613,12 @@ class ZkMigrationIntegrationTest { val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) // Allocate a block of producer IDs while in ZK mode - val nextProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS) + var nextProducerId = -1L + + TestUtils.retry(60000) { + assertDoesNotThrow((() => nextProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(20, TimeUnit.SECONDS)): Executable) + } + assertEquals(0, nextProducerId) // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") @@ -645,7 +651,11 @@ class ZkMigrationIntegrationTest { log.info("Verifying metadata changes with ZK") verifyTopicConfigs(zkClient) verifyClientQuotas(zkClient) - val nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS) + var nextKRaftProducerId = -1L + + TestUtils.retry(60000) { + assertDoesNotThrow((() => nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(20, TimeUnit.SECONDS)): Executable) + } assertNotEquals(nextProducerId, nextKRaftProducerId) } finally { @@ -970,7 +980,11 @@ class ZkMigrationIntegrationTest { override def onComplete(response: ClientResponse): Unit = { val body = response.responseBody().asInstanceOf[AllocateProducerIdsResponse] - producerIdStart.complete(body.data().producerIdStart()) + if (body.data().errorCode() != 0) { + producerIdStart.completeExceptionally(new RuntimeException(s"Received error code ${body.data().errorCode()}")) + } else { + producerIdStart.complete(body.data().producerIdStart()) + } } }) producerIdStart