KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)

These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped.

Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Randall Hauch 2021-02-19 11:49:56 -06:00 committed by GitHub
parent b17f70ed66
commit c75a73862a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 6 deletions

View File

@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -226,6 +227,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private SharedTopicAdmin ownTopicAdmin;
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
@ -291,7 +293,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
@Override
public void stop() {
log.info("Closing KafkaConfigBackingStore");
try {
configLog.stop();
} finally {
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
log.info("Closed KafkaConfigBackingStore");
}
@ -479,7 +487,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();

View File

@ -32,6 +32,7 @@ import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,6 +66,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
private KafkaBasedLog<byte[], byte[]> offsetLog;
private HashMap<ByteBuffer, ByteBuffer> data;
private final Supplier<TopicAdmin> topicAdminSupplier;
private SharedTopicAdmin ownTopicAdmin;
@Deprecated
public KafkaOffsetBackingStore() {
@ -98,7 +100,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).offsetStorageTopicSettings()
: Collections.emptyMap();
@ -140,7 +149,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
@Override
public void stop() {
log.info("Stopping KafkaOffsetBackingStore");
try {
offsetLog.stop();
} finally {
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
log.info("Stopped KafkaOffsetBackingStore");
}

View File

@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.Table;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
@ -134,6 +135,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
private SharedTopicAdmin ownTopicAdmin;
@Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
@ -177,7 +179,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;
} else {
// Create our own topic admin supplier that we'll close when we're stopped
ownTopicAdmin = new SharedTopicAdmin(adminProps);
adminSupplier = ownTopicAdmin;
}
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
@ -221,7 +230,13 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
@Override
public void stop() {
try {
kafkaLog.stop();
} finally {
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
}
}
@Override