mirror of https://github.com/apache/kafka.git
KAFKA-17232: Do not generate task configs in MirrorCheckpointConnector if initial consumer group load times out (#16767)
Reviewers: Hongten <hongtenzone@foxmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
parent
7fe3cec4eb
commit
0b57b36c8f
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.AppInfoParser;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.connect.connector.Task;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.errors.RetriableException;
|
||||
import org.apache.kafka.connect.source.SourceConnector;
|
||||
import org.apache.kafka.connect.util.ConnectorUtils;
|
||||
|
||||
|
@ -63,7 +64,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
|
|||
private Admin sourceAdminClient;
|
||||
private Admin targetAdminClient;
|
||||
private SourceAndTarget sourceAndTarget;
|
||||
private Set<String> knownConsumerGroups = Collections.emptySet();
|
||||
private Set<String> knownConsumerGroups = null;
|
||||
|
||||
public MirrorCheckpointConnector() {
|
||||
// nop
|
||||
|
@ -81,7 +82,6 @@ public class MirrorCheckpointConnector extends SourceConnector {
|
|||
if (!config.enabled()) {
|
||||
return;
|
||||
}
|
||||
String connectorName = config.connectorName();
|
||||
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
|
||||
topicFilter = config.topicFilter();
|
||||
groupFilter = config.groupFilter();
|
||||
|
@ -92,8 +92,6 @@ public class MirrorCheckpointConnector extends SourceConnector {
|
|||
scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
|
||||
scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(),
|
||||
"refreshing consumer groups");
|
||||
log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size());
|
||||
log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,6 +131,13 @@ public class MirrorCheckpointConnector extends SourceConnector {
|
|||
// divide consumer groups among tasks
|
||||
@Override
|
||||
public List<Map<String, String>> taskConfigs(int maxTasks) {
|
||||
if (knownConsumerGroups == null) {
|
||||
// If knownConsumerGroup is null, it means the initial loading has not finished.
|
||||
// An exception should be thrown to trigger the retry behavior in the framework.
|
||||
log.debug("Initial consumer loading has not yet completed");
|
||||
throw new RetriableException("Timeout while loading consumer groups.");
|
||||
}
|
||||
|
||||
// if the replication is disabled, known consumer group is empty, or checkpoint emission is
|
||||
// disabled by setting 'emit.checkpoints.enabled' to false, the interval of checkpoint emission
|
||||
// will be negative and no 'MirrorCheckpointTask' will be created
|
||||
|
@ -186,6 +191,9 @@ public class MirrorCheckpointConnector extends SourceConnector {
|
|||
|
||||
private void refreshConsumerGroups()
|
||||
throws InterruptedException, ExecutionException {
|
||||
// If loadInitialConsumerGroups fails for any reason(e.g., timeout), knownConsumerGroups may be null.
|
||||
// We still want this method to recover gracefully in such cases.
|
||||
Set<String> knownConsumerGroups = this.knownConsumerGroups == null ? Collections.emptySet() : this.knownConsumerGroups;
|
||||
Set<String> consumerGroups = findConsumerGroups();
|
||||
Set<String> newConsumerGroups = new HashSet<>(consumerGroups);
|
||||
newConsumerGroups.removeAll(knownConsumerGroups);
|
||||
|
@ -196,14 +204,17 @@ public class MirrorCheckpointConnector extends SourceConnector {
|
|||
consumerGroups.size(), sourceAndTarget, newConsumerGroups.size(), deadConsumerGroups.size(),
|
||||
knownConsumerGroups.size());
|
||||
log.debug("Found new consumer groups: {}", newConsumerGroups);
|
||||
knownConsumerGroups = consumerGroups;
|
||||
this.knownConsumerGroups = consumerGroups;
|
||||
context.requestTaskReconfiguration();
|
||||
}
|
||||
}
|
||||
|
||||
private void loadInitialConsumerGroups()
|
||||
throws InterruptedException, ExecutionException {
|
||||
String connectorName = config.connectorName();
|
||||
knownConsumerGroups = findConsumerGroups();
|
||||
log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size());
|
||||
log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups);
|
||||
}
|
||||
|
||||
Set<String> findConsumerGroups()
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.errors.RetriableException;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
|
@ -94,6 +95,18 @@ public class MirrorCheckpointConnectorTest {
|
|||
assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerGroupInitializeTimeout() {
|
||||
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
|
||||
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(null, config);
|
||||
|
||||
assertThrows(
|
||||
RetriableException.class,
|
||||
() -> connector.taskConfigs(1),
|
||||
"taskConfigs should throw exception when initial loading ConsumerGroup timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationDisabled() {
|
||||
// disable the replication
|
||||
|
|
Loading…
Reference in New Issue