From 9ddd58bd6c0444327b08385b51af7b4f28d1cff7 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Thu, 13 Jun 2024 05:43:33 +0200 Subject: [PATCH] MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306) Reviewers: Greg Harris --- .../ExactlyOnceSourceIntegrationTest.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 6d4b648201a..84ee814ae40 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -99,6 +99,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXA import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -747,9 +748,18 @@ public class ExactlyOnceSourceIntegrationTest { workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); startConnect(); - EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps); + + int numConnectorTargetedBrokers = 1; + EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps); try (Closeable clusterShutdown = connectorTargetedCluster::stop) { connectorTargetedCluster.start(); + // Wait for the connector-targeted Kafka cluster to get on its feet + waitForCondition( + () -> connectorTargetedCluster.runningBrokers().size() == numConnectorTargetedBrokers, + ConnectAssertions.WORKER_SETUP_DURATION_MS, + "Separate Kafka cluster did not start in time" + ); + String topic = "test-topic"; connectorTargetedCluster.createTopic(topic, 3); @@ -777,6 +787,11 @@ public class ExactlyOnceSourceIntegrationTest { // start a source connector connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "connector and tasks did not start in time" + ); log.info("Waiting for records to be provided to worker by task"); // wait for the connector tasks to produce enough records