KAFKA-17232: Do not generate task configs in MirrorCheckpointConnector if initial consumer group load times out (#16767)

Reviewers: Hongten <hongtenzone@foxmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
TengYao Chi 2024-08-08 21:58:11 +08:00 committed by GitHub
parent c9d415f361
commit 0dc74c5556
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 5 deletions

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
@ -63,7 +64,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
private Admin sourceAdminClient;
private Admin targetAdminClient;
private SourceAndTarget sourceAndTarget;
private Set<String> knownConsumerGroups = Collections.emptySet();
private Set<String> knownConsumerGroups = null;
public MirrorCheckpointConnector() {
// nop
@ -81,7 +82,6 @@ public class MirrorCheckpointConnector extends SourceConnector {
if (!config.enabled()) {
return;
}
String connectorName = config.connectorName();
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
topicFilter = config.topicFilter();
groupFilter = config.groupFilter();
@ -92,8 +92,6 @@ public class MirrorCheckpointConnector extends SourceConnector {
scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(),
"refreshing consumer groups");
log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size());
log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups);
}
@Override
@ -133,6 +131,13 @@ public class MirrorCheckpointConnector extends SourceConnector {
// divide consumer groups among tasks
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (knownConsumerGroups == null) {
// If knownConsumerGroup is null, it means the initial loading has not finished.
// An exception should be thrown to trigger the retry behavior in the framework.
log.debug("Initial consumer loading has not yet completed");
throw new RetriableException("Timeout while loading consumer groups.");
}
// if the replication is disabled, known consumer group is empty, or checkpoint emission is
// disabled by setting 'emit.checkpoints.enabled' to false, the interval of checkpoint emission
// will be negative and no 'MirrorCheckpointTask' will be created
@ -186,6 +191,9 @@ public class MirrorCheckpointConnector extends SourceConnector {
private void refreshConsumerGroups()
throws InterruptedException, ExecutionException {
// If loadInitialConsumerGroups fails for any reason(e.g., timeout), knownConsumerGroups may be null.
// We still want this method to recover gracefully in such cases.
Set<String> knownConsumerGroups = this.knownConsumerGroups == null ? Collections.emptySet() : this.knownConsumerGroups;
Set<String> consumerGroups = findConsumerGroups();
Set<String> newConsumerGroups = new HashSet<>(consumerGroups);
newConsumerGroups.removeAll(knownConsumerGroups);
@ -196,14 +204,17 @@ public class MirrorCheckpointConnector extends SourceConnector {
consumerGroups.size(), sourceAndTarget, newConsumerGroups.size(), deadConsumerGroups.size(),
knownConsumerGroups.size());
log.debug("Found new consumer groups: {}", newConsumerGroups);
knownConsumerGroups = consumerGroups;
this.knownConsumerGroups = consumerGroups;
context.requestTaskReconfiguration();
}
}
private void loadInitialConsumerGroups()
throws InterruptedException, ExecutionException {
String connectorName = config.connectorName();
knownConsumerGroups = findConsumerGroups();
log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size());
log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups);
}
Set<String> findConsumerGroups()

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.junit.jupiter.api.Test;
@ -94,6 +95,18 @@ public class MirrorCheckpointConnectorTest {
assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
}
@Test
public void testConsumerGroupInitializeTimeout() {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(null, config);
assertThrows(
RetriableException.class,
() -> connector.taskConfigs(1),
"taskConfigs should throw exception when initial loading ConsumerGroup timeout"
);
}
@Test
public void testReplicationDisabled() {
// disable the replication