KAFKA-16684: Fix flaky DedicatedMirrorIntegrationTest (#15906)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Johnny Hsu 2024-05-11 02:42:26 +08:00 committed by GitHub
parent 147ea55dfe
commit 0cfc2983a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 9 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
@ -259,10 +260,9 @@ public class DedicatedMirrorIntegrationTest {
// wait for heartbeat connector to start running
awaitConnectorTasksStart(mirrorMakers.get("node 0"), MirrorHeartbeatConnector.class, sourceAndTarget);
// Create one topic per Kafka cluster per MirrorMaker node
final int topicsPerCluster = numNodes;
final int messagesPerTopic = 10;
for (int i = 0; i < topicsPerCluster; i++) {
// Create one topic per Kafka cluster per MirrorMaker node
for (int i = 0; i < numNodes; i++) {
String topic = testTopicPrefix + i;
// Create the topic on cluster A
@ -353,6 +353,12 @@ public class DedicatedMirrorIntegrationTest {
.map(TaskInfo::config)
.allMatch(predicate);
} catch (Exception ex) {
if (ex instanceof RebalanceNeededException) {
// RebalanceNeededException should be retry-able.
// This happens when a worker has read a new config from the config topic, but hasn't completed the
// subsequent rebalance yet
throw ex;
}
log.error("Something unexpected occurred. Unable to get configuration of connector {} for mirror maker with source->target={}", connName, sourceAndTarget, ex);
throw new NoRetryException(ex);
}