mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306)
Reviewers: Greg Harris <gharris1727@gmail.com>
This commit is contained in:
		
							parent
							
								
									91bd1baff0
								
							
						
					
					
						commit
						15b62351a1
					
				|  | @ -99,6 +99,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXA | ||||||
| import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; | import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; | ||||||
| import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; | import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; | ||||||
| import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; | import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; | ||||||
|  | import static org.apache.kafka.test.TestUtils.waitForCondition; | ||||||
| import static org.junit.Assert.assertEquals; | import static org.junit.Assert.assertEquals; | ||||||
| import static org.junit.Assert.assertFalse; | import static org.junit.Assert.assertFalse; | ||||||
| import static org.junit.Assert.assertNotNull; | import static org.junit.Assert.assertNotNull; | ||||||
|  | @ -747,9 +748,18 @@ public class ExactlyOnceSourceIntegrationTest { | ||||||
|         workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); |         workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); | ||||||
| 
 | 
 | ||||||
|         startConnect(); |         startConnect(); | ||||||
|         EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps); | 
 | ||||||
|  |         int numConnectorTargetedBrokers = 1; | ||||||
|  |         EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps); | ||||||
|         try (Closeable clusterShutdown = connectorTargetedCluster::stop) { |         try (Closeable clusterShutdown = connectorTargetedCluster::stop) { | ||||||
|             connectorTargetedCluster.start(); |             connectorTargetedCluster.start(); | ||||||
|  |             // Wait for the connector-targeted Kafka cluster to get on its feet | ||||||
|  |             waitForCondition( | ||||||
|  |                     () -> connectorTargetedCluster.runningBrokers().size() == numConnectorTargetedBrokers, | ||||||
|  |                     ConnectAssertions.WORKER_SETUP_DURATION_MS, | ||||||
|  |                     "Separate Kafka cluster did not start in time" | ||||||
|  |             ); | ||||||
|  | 
 | ||||||
|             String topic = "test-topic"; |             String topic = "test-topic"; | ||||||
|             connectorTargetedCluster.createTopic(topic, 3); |             connectorTargetedCluster.createTopic(topic, 3); | ||||||
| 
 | 
 | ||||||
|  | @ -777,6 +787,11 @@ public class ExactlyOnceSourceIntegrationTest { | ||||||
| 
 | 
 | ||||||
|             // start a source connector |             // start a source connector | ||||||
|             connect.configureConnector(CONNECTOR_NAME, props); |             connect.configureConnector(CONNECTOR_NAME, props); | ||||||
|  |             connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( | ||||||
|  |                     CONNECTOR_NAME, | ||||||
|  |                     numTasks, | ||||||
|  |                     "connector and tasks did not start in time" | ||||||
|  |             ); | ||||||
| 
 | 
 | ||||||
|             log.info("Waiting for records to be provided to worker by task"); |             log.info("Waiting for records to be provided to worker by task"); | ||||||
|             // wait for the connector tasks to produce enough records |             // wait for the connector tasks to produce enough records | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue