KAFKA-15827: Prevent KafkaBasedLog subclasses from leaking passed-in clients (#14763)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2024-01-19 12:50:01 -08:00 committed by GitHub
parent 556dc2a93f
commit 93971465b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 74 additions and 36 deletions

View File

@ -18,7 +18,6 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -28,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
@ -87,31 +85,16 @@ class OffsetSyncStore implements AutoCloseable {
}
private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
return new KafkaBasedLog<byte[], byte[]>(
return KafkaBasedLog.withExistingClients(
config.offsetSyncsTopic(),
Collections.emptyMap(),
Collections.emptyMap(),
() -> admin,
consumer,
null,
admin,
(error, record) -> this.handleRecord(record),
Time.SYSTEM,
ignored -> {
}
) {
@Override
protected Producer<byte[], byte[]> createProducer() {
return null;
}
@Override
protected Consumer<byte[], byte[]> createConsumer() {
return consumer;
}
@Override
protected boolean readPartition(TopicPartition topicPartition) {
return topicPartition.partition() == 0;
}
};
ignored -> { },
topicPartition -> topicPartition.partition() == 0
);
}
OffsetSyncStore() {

View File

@ -156,7 +156,7 @@ public class Worker {
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter = Optional.empty();
private final WorkerConfigTransformer workerConfigTransformer;
private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
private final Function<Map<String, Object>, Admin> adminFactory;
@ -269,7 +269,9 @@ public class Worker {
log.info("Worker stopped");
workerMetricsGroup.close();
if (connectorStatusMetricsGroup != null) {
connectorStatusMetricsGroup.close();
}
workerConfigTransformer.close();
ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);

View File

@ -384,9 +384,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
halt();
log.info("Herder stopped");
herderMetrics.close();
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Utils.closeQuietly(this::stopServices, "herder services");
Exit.exit(1);
} finally {
running = false;
@ -787,6 +787,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
tasksToRestart.addAll(tasksToStop);
}
/**
* Perform an orderly shutdown when triggered via {@link #stop()}
*/
// public for testing
public void halt() {
synchronized (this) {
@ -795,8 +798,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
worker.stopAndAwaitConnectors();
worker.stopAndAwaitTasks();
member.stop();
// Explicitly fail any outstanding requests so they actually get a response and get an
// understandable reason for their failure.
DistributedHerderRequest request = requests.pollFirst();
@ -804,7 +805,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
request = requests.pollFirst();
}
stopServices();
}
}
@ -814,10 +814,19 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
try {
super.stopServices();
} finally {
this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
closeResources();
}
}
/**
* Close resources managed by this herder but which are not explicitly started.
*/
private void closeResources() {
Utils.closeQuietly(member::stop, "worker group member");
Utils.closeQuietly(herderMetrics::close, "herder metrics");
this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
}
// Timeout for herderExecutor to gracefully terminate is set to a value to accommodate
// reading to the end of the config topic + successfully attempting to stop all connectors and tasks and a buffer of 10s
private long herderExecutorTimeoutMs() {

View File

@ -99,7 +99,8 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
topicAdmin,
consumedCallback,
Time.SYSTEM,
topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM)
topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM),
ignored -> true
);
}
};
@ -131,7 +132,8 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
topicAdmin,
consumedCallback,
Time.SYSTEM,
topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM)
topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM),
ignored -> true
);
}
};

View File

@ -54,6 +54,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -195,6 +196,7 @@ public class KafkaBasedLog<K, V> {
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
* @param initializer the function that should be run when this log is {@link #start() started}; may be null
* @param readTopicPartition A predicate which returns true for each {@link TopicPartition} that should be read
* @return a {@link KafkaBasedLog} using the given clients
*/
public static <K, V> KafkaBasedLog<K, V> withExistingClients(String topic,
@ -203,8 +205,11 @@ public class KafkaBasedLog<K, V> {
TopicAdmin topicAdmin,
Callback<ConsumerRecord<K, V>> consumedCallback,
Time time,
java.util.function.Consumer<TopicAdmin> initializer) {
java.util.function.Consumer<TopicAdmin> initializer,
Predicate<TopicPartition> readTopicPartition
) {
Objects.requireNonNull(topicAdmin);
Objects.requireNonNull(readTopicPartition);
return new KafkaBasedLog<K, V>(topic,
Collections.emptyMap(),
Collections.emptyMap(),
@ -222,6 +227,19 @@ public class KafkaBasedLog<K, V> {
protected Consumer<K, V> createConsumer() {
return consumer;
}
@Override
protected boolean readPartition(TopicPartition topicPartition) {
return readTopicPartition.test(topicPartition);
}
@Override
public void stop() {
super.stop();
// Close the clients here, if the thread that was responsible for closing them was never started.
Utils.closeQuietly(producer, "producer");
Utils.closeQuietly(consumer, "consumer");
}
};
}

View File

@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
@ -69,6 +70,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -122,6 +124,7 @@ public class KafkaBasedLogTest {
private KafkaProducer<String, String> producer;
private TopicAdmin admin;
private final Supplier<TopicAdmin> topicAdminSupplier = () -> admin;
private final Predicate<TopicPartition> predicate = ignored -> true;
private MockConsumer<String, String> consumer;
private final Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
@ -480,10 +483,31 @@ public class KafkaBasedLogTest {
verify(admin).endOffsets(eq(tps));
}
@Test
public void testWithExistingClientsStartAndStop() {
admin = mock(TopicAdmin.class);
store = KafkaBasedLog.withExistingClients(TOPIC, consumer, producer, admin, consumedCallback, time, initializer, predicate);
store.start();
store.stop();
verifyStartAndStop();
}
@Test
public void testWithExistingClientsStopOnly() {
admin = mock(TopicAdmin.class);
store = KafkaBasedLog.withExistingClients(TOPIC, consumer, producer, admin, consumedCallback, time, initializer, predicate);
store.stop();
verifyStop();
}
private void verifyStartAndStop() {
verify(initializer).accept(admin);
verify(producer).close();
assertTrue(consumer.closed());
verifyStop();
assertFalse(store.thread.isAlive());
}
private void verifyStop() {
verify(producer, atLeastOnce()).close();
assertTrue(consumer.closed());
}
}