From 0b57b36c8f95cd251243d0c38b210b0f815aa529 Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Thu, 8 Aug 2024 21:58:11 +0800 Subject: [PATCH] KAFKA-17232: Do not generate task configs in MirrorCheckpointConnector if initial consumer group load times out (#16767) Reviewers: Hongten , Chris Egerton --- .../mirror/MirrorCheckpointConnector.java | 21 ++++++++++++++----- .../mirror/MirrorCheckpointConnectorTest.java | 13 ++++++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 24c98b76d84..a387b695d8a 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.util.ConnectorUtils; @@ -63,7 +64,7 @@ public class MirrorCheckpointConnector extends SourceConnector { private Admin sourceAdminClient; private Admin targetAdminClient; private SourceAndTarget sourceAndTarget; - private Set knownConsumerGroups = Collections.emptySet(); + private Set knownConsumerGroups = null; public MirrorCheckpointConnector() { // nop @@ -81,7 +82,6 @@ public class MirrorCheckpointConnector extends SourceConnector { if (!config.enabled()) { return; } - String connectorName = config.connectorName(); sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias()); topicFilter = config.topicFilter(); groupFilter = config.groupFilter(); @@ -92,8 +92,6 @@ public class MirrorCheckpointConnector extends SourceConnector { scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups"); scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(), "refreshing consumer groups"); - log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size()); - log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups); } @Override @@ -133,6 +131,13 @@ public class MirrorCheckpointConnector extends SourceConnector { // divide consumer groups among tasks @Override public List> taskConfigs(int maxTasks) { + if (knownConsumerGroups == null) { + // If knownConsumerGroup is null, it means the initial loading has not finished. + // An exception should be thrown to trigger the retry behavior in the framework. + log.debug("Initial consumer loading has not yet completed"); + throw new RetriableException("Timeout while loading consumer groups."); + } + // if the replication is disabled, known consumer group is empty, or checkpoint emission is // disabled by setting 'emit.checkpoints.enabled' to false, the interval of checkpoint emission // will be negative and no 'MirrorCheckpointTask' will be created @@ -186,6 +191,9 @@ public class MirrorCheckpointConnector extends SourceConnector { private void refreshConsumerGroups() throws InterruptedException, ExecutionException { + // If loadInitialConsumerGroups fails for any reason(e.g., timeout), knownConsumerGroups may be null. + // We still want this method to recover gracefully in such cases. + Set knownConsumerGroups = this.knownConsumerGroups == null ? Collections.emptySet() : this.knownConsumerGroups; Set consumerGroups = findConsumerGroups(); Set newConsumerGroups = new HashSet<>(consumerGroups); newConsumerGroups.removeAll(knownConsumerGroups); @@ -196,14 +204,17 @@ public class MirrorCheckpointConnector extends SourceConnector { consumerGroups.size(), sourceAndTarget, newConsumerGroups.size(), deadConsumerGroups.size(), knownConsumerGroups.size()); log.debug("Found new consumer groups: {}", newConsumerGroups); - knownConsumerGroups = consumerGroups; + this.knownConsumerGroups = consumerGroups; context.requestTaskReconfiguration(); } } private void loadInitialConsumerGroups() throws InterruptedException, ExecutionException { + String connectorName = config.connectorName(); knownConsumerGroups = findConsumerGroups(); + log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size()); + log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups); } Set findConsumerGroups() diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java index 770ef683659..8a25bdaa5e4 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.junit.jupiter.api.Test; @@ -94,6 +95,18 @@ public class MirrorCheckpointConnectorTest { assertEquals(0, output.size(), "ConsumerGroup shouldn't exist"); } + @Test + public void testConsumerGroupInitializeTimeout() { + MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps()); + MirrorCheckpointConnector connector = new MirrorCheckpointConnector(null, config); + + assertThrows( + RetriableException.class, + () -> connector.taskConfigs(1), + "taskConfigs should throw exception when initial loading ConsumerGroup timeout" + ); + } + @Test public void testReplicationDisabled() { // disable the replication