MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306)

Reviewers: Greg Harris <gharris1727@gmail.com>
This commit is contained in:
Chris Egerton 2024-06-13 05:43:33 +02:00 committed by GitHub
parent 0a203a9622
commit 9ddd58bd6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 16 additions and 1 deletions

View File

@ -99,6 +99,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXA
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -747,9 +748,18 @@ public class ExactlyOnceSourceIntegrationTest {
workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic);
startConnect();
EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps);
int numConnectorTargetedBrokers = 1;
EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps);
try (Closeable clusterShutdown = connectorTargetedCluster::stop) {
connectorTargetedCluster.start();
// Wait for the connector-targeted Kafka cluster to get on its feet
waitForCondition(
() -> connectorTargetedCluster.runningBrokers().size() == numConnectorTargetedBrokers,
ConnectAssertions.WORKER_SETUP_DURATION_MS,
"Separate Kafka cluster did not start in time"
);
String topic = "test-topic";
connectorTargetedCluster.createTopic(topic, 3);
@ -777,6 +787,11 @@ public class ExactlyOnceSourceIntegrationTest {
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"connector and tasks did not start in time"
);
log.info("Waiting for records to be provided to worker by task");
// wait for the connector tasks to produce enough records