KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)

Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2022-12-07 13:59:46 -08:00 committed by GitHub
parent 854dfb5ffc
commit 83732489ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 14 deletions

View File

@ -18,7 +18,6 @@
package org.apache.kafka.connect.mirror; package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@ -72,7 +71,7 @@ public class MirrorClient implements AutoCloseable {
} }
// for testing // for testing
MirrorClient(ForwardingAdmin adminClient, ReplicationPolicy replicationPolicy, MirrorClient(Admin adminClient, ReplicationPolicy replicationPolicy,
Map<String, Object> consumerConfig) { Map<String, Object> consumerConfig) {
this.adminClient = adminClient; this.adminClient = adminClient;
this.replicationPolicy = replicationPolicy; this.replicationPolicy = replicationPolicy;

View File

@ -48,6 +48,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
private MirrorCheckpointConfig config; private MirrorCheckpointConfig config;
private GroupFilter groupFilter; private GroupFilter groupFilter;
private Admin sourceAdminClient; private Admin sourceAdminClient;
private Admin targetAdminClient;
private SourceAndTarget sourceAndTarget; private SourceAndTarget sourceAndTarget;
private List<String> knownConsumerGroups = Collections.emptyList(); private List<String> knownConsumerGroups = Collections.emptyList();
@ -71,6 +72,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias()); sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
groupFilter = config.groupFilter(); groupFilter = config.groupFilter();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout()); scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics"); scheduler.execute(this::createInternalTopics, "creating internal topics");
scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups"); scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
@ -88,6 +90,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
Utils.closeQuietly(scheduler, "scheduler"); Utils.closeQuietly(scheduler, "scheduler");
Utils.closeQuietly(groupFilter, "group filter"); Utils.closeQuietly(groupFilter, "group filter");
Utils.closeQuietly(sourceAdminClient, "source admin client"); Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
} }
@Override @Override
@ -159,8 +162,11 @@ public class MirrorCheckpointConnector extends SourceConnector {
} }
private void createInternalTopics() { private void createInternalTopics() {
MirrorUtils.createSinglePartitionCompactedTopic(config.checkpointsTopic(), MirrorUtils.createSinglePartitionCompactedTopic(
config.checkpointsTopicReplicationFactor(), config.forwardingAdmin(config.targetAdminConfig())); config.checkpointsTopic(),
config.checkpointsTopicReplicationFactor(),
targetAdminClient
);
} }
boolean shouldReplicate(String group) { boolean shouldReplicate(String group) {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.connect.mirror; package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
@ -33,6 +34,7 @@ import java.util.Collections;
public class MirrorHeartbeatConnector extends SourceConnector { public class MirrorHeartbeatConnector extends SourceConnector {
private MirrorHeartbeatConfig config; private MirrorHeartbeatConfig config;
private Scheduler scheduler; private Scheduler scheduler;
private Admin targetAdminClient;
public MirrorHeartbeatConnector() { public MirrorHeartbeatConnector() {
// nop // nop
@ -46,6 +48,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
@Override @Override
public void start(Map<String, String> props) { public void start(Map<String, String> props) {
config = new MirrorHeartbeatConfig(props); config = new MirrorHeartbeatConfig(props);
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout()); scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics"); scheduler.execute(this::createInternalTopics, "creating internal topics");
} }
@ -53,6 +56,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
@Override @Override
public void stop() { public void stop() {
Utils.closeQuietly(scheduler, "scheduler"); Utils.closeQuietly(scheduler, "scheduler");
Utils.closeQuietly(targetAdminClient, "target admin client");
} }
@Override @Override
@ -82,7 +86,10 @@ public class MirrorHeartbeatConnector extends SourceConnector {
} }
private void createInternalTopics() { private void createInternalTopics() {
MirrorUtils.createSinglePartitionCompactedTopic(config.heartbeatsTopic(), MirrorUtils.createSinglePartitionCompactedTopic(
config.heartbeatsTopicReplicationFactor(), config.forwardingAdmin(config.targetAdminConfig())); config.heartbeatsTopic(),
config.heartbeatsTopicReplicationFactor(),
targetAdminClient
);
} }
} }

View File

@ -18,7 +18,7 @@ package org.apache.kafka.connect.mirror;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.kafka.clients.admin.ForwardingAdmin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
@ -80,8 +80,9 @@ public class MirrorSourceConnector extends SourceConnector {
private List<TopicPartition> knownTargetTopicPartitions = Collections.emptyList(); private List<TopicPartition> knownTargetTopicPartitions = Collections.emptyList();
private ReplicationPolicy replicationPolicy; private ReplicationPolicy replicationPolicy;
private int replicationFactor; private int replicationFactor;
private ForwardingAdmin sourceAdminClient; private Admin sourceAdminClient;
private ForwardingAdmin targetAdminClient; private Admin targetAdminClient;
private Admin offsetSyncsAdminClient;
public MirrorSourceConnector() { public MirrorSourceConnector() {
// nop // nop
@ -117,6 +118,7 @@ public class MirrorSourceConnector extends SourceConnector {
replicationFactor = config.replicationFactor(); replicationFactor = config.replicationFactor();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout()); scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic"); scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions"); scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
@ -142,6 +144,7 @@ public class MirrorSourceConnector extends SourceConnector {
Utils.closeQuietly(configPropertyFilter, "config property filter"); Utils.closeQuietly(configPropertyFilter, "config property filter");
Utils.closeQuietly(sourceAdminClient, "source admin client"); Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client"); Utils.closeQuietly(targetAdminClient, "target admin client");
Utils.closeQuietly(offsetSyncsAdminClient, "offset syncs admin client");
log.info("Stopping {} took {} ms.", connectorName, System.currentTimeMillis() - start); log.info("Stopping {} took {} ms.", connectorName, System.currentTimeMillis() - start);
} }
@ -306,9 +309,10 @@ public class MirrorSourceConnector extends SourceConnector {
} }
private void createOffsetSyncsTopic() { private void createOffsetSyncsTopic() {
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), MirrorUtils.createSinglePartitionCompactedTopic(
config.offsetSyncsTopic(),
config.offsetSyncsTopicReplicationFactor(), config.offsetSyncsTopicReplicationFactor(),
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()) offsetSyncsAdminClient
); );
} }
@ -393,7 +397,7 @@ public class MirrorSourceConnector extends SourceConnector {
})); }));
} }
private Set<String> listTopics(ForwardingAdmin adminClient) private Set<String> listTopics(Admin adminClient)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
return adminClient.listTopics().names().get(); return adminClient.listTopics().names().get();
} }
@ -403,7 +407,7 @@ public class MirrorSourceConnector extends SourceConnector {
return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get(); return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
} }
private static Collection<TopicDescription> describeTopics(ForwardingAdmin adminClient, Collection<String> topics) private static Collection<TopicDescription> describeTopics(Admin adminClient, Collection<String> topics)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
return adminClient.describeTopics(topics).allTopicNames().get().values(); return adminClient.describeTopics(topics).allTopicNames().get().values();
} }