KAFKA-13950: Fix resource leak in error scenarios (#12228)

We are not properly closing Closeable resources in the code base at multiple places especially when we have an exception. This code change fixes multiple of these leaks.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Divij Vaidya 2023-12-21 13:47:22 +01:00 committed by GitHub
parent 75dcc8dadf
commit 6250049e10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 102 additions and 47 deletions

View File

@ -34,6 +34,7 @@ public interface ChannelBuilder extends AutoCloseable, Configurable {
* @param key SelectionKey
* @param maxReceiveSize max size of a single receive buffer to allocate
* @param memoryPool memory pool from which to allocate buffers, or null for none
* @param metadataRegistry registry which stores the metadata about the channels
* @return KafkaChannel
*/
KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,

View File

@ -54,12 +54,17 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
PlaintextTransportLayer transportLayer = null;
try {
PlaintextTransportLayer transportLayer = buildTransportLayer(key);
Supplier<Authenticator> authenticatorCreator = () -> new PlaintextAuthenticator(configs, transportLayer, listenerName);
transportLayer = buildTransportLayer(key);
final PlaintextTransportLayer finalTransportLayer = transportLayer;
Supplier<Authenticator> authenticatorCreator = () -> new PlaintextAuthenticator(configs, finalTransportLayer, listenerName);
return buildChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
} catch (Exception e) {
// Ideally these resources are closed by the KafkaChannel but this builder should close the resources instead
// if an error occurs due to which KafkaChannel is not created.
Utils.closeQuietly(transportLayer, "transport layer for channel Id: " + id);
throw new KafkaException(e);
}
}

View File

@ -210,16 +210,18 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
TransportLayer transportLayer = null;
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
Socket socket = socketChannel.socket();
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel, metadataRegistry);
transportLayer = buildTransportLayer(id, key, socketChannel, metadataRegistry);
final TransportLayer finalTransportLayer = transportLayer;
Supplier<Authenticator> authenticatorCreator;
if (mode == Mode.SERVER) {
authenticatorCreator = () -> buildServerAuthenticator(configs,
Collections.unmodifiableMap(saslCallbackHandlers),
id,
transportLayer,
finalTransportLayer,
Collections.unmodifiableMap(subjects),
Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism),
metadataRegistry);
@ -230,12 +232,15 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
id,
socket.getInetAddress().getHostName(),
loginManager.serviceName(),
transportLayer,
finalTransportLayer,
subjects.get(clientSaslMechanism));
}
return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
} catch (Exception e) {
// Ideally these resources are closed by the KafkaChannel but this builder should close the resources instead
// if an error occurs due to which KafkaChannel is not created.
Utils.closeQuietly(transportLayer, "transport layer for channel Id: " + id);
throw new KafkaException(e);
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.common.network;
import org.apache.kafka.common.memory.MemoryPool;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
@ -78,6 +80,10 @@ public interface Selectable {
/**
* The collection of receives that completed on the last {@link #poll(long) poll()} call.
*
* Note that the caller of this method assumes responsibility to close the NetworkReceive resources which may be
* backed by a {@link MemoryPool}. In such scenarios (when NetworkReceive uses a {@link MemoryPool}, it is necessary
* to close the {@link NetworkReceive} to prevent any memory leaks.
*/
Collection<NetworkReceive> completedReceives();

View File

@ -334,9 +334,10 @@ public class Selector implements Selectable, AutoCloseable {
}
private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException {
ChannelMetadataRegistry metadataRegistry = null;
try {
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool,
new SelectorChannelMetadataRegistry());
metadataRegistry = new SelectorChannelMetadataRegistry();
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool, metadataRegistry);
key.attach(channel);
return channel;
} catch (Exception e) {
@ -345,6 +346,9 @@ public class Selector implements Selectable, AutoCloseable {
} finally {
key.cancel();
}
// Ideally, these resources are closed by the KafkaChannel but if the KafkaChannel is not created to an
// error, this builder should close the resources it has created instead.
Utils.closeQuietly(metadataRegistry, "metadataRegistry");
throw new IOException("Channel could not be created for socket " + socketChannel, e);
}
}

View File

@ -101,13 +101,18 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
SslTransportLayer transportLayer = null;
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, metadataRegistry);
transportLayer = buildTransportLayer(sslFactory, id, key, metadataRegistry);
final SslTransportLayer finalTransportLayer = transportLayer;
Supplier<Authenticator> authenticatorCreator = () ->
new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper);
new SslAuthenticator(configs, finalTransportLayer, listenerName, sslPrincipalMapper);
return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
} catch (Exception e) {
// Ideally these resources are closed by the KafkaChannel but this builder should close the resources instead
// if an error occurs due to which KafkaChannel is not created.
Utils.closeQuietly(transportLayer, "transport layer for channel Id: " + id);
throw new KafkaException(e);
}
}

View File

@ -385,8 +385,7 @@ public final class LegacyRecord {
ByteBuffer value,
CompressionType compressionType,
TimestampType timestampType) {
try {
DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
try (DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer))) {
write(out, magic, timestamp, key, value, compressionType, timestampType);
} catch (IOException e) {
throw new KafkaException(e);

View File

@ -194,14 +194,12 @@ public class MemoryRecords extends AbstractRecords {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
final MemoryRecordsBuilder builder;
long deleteHorizonMs;
if (needToSetDeleteHorizon)
deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
else
deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP);
builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
try (final MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs)) {
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
@ -214,6 +212,7 @@ public class MemoryRecords extends AbstractRecords {
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
maxOffset, retainedRecords.size(), filteredBatchSize);
}
}
} else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
throw new IllegalStateException("Empty batches are only supported for magic v2 and above");
@ -677,13 +676,14 @@ public class MemoryRecords extends AbstractRecords {
long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType,
try (final MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType,
initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false,
partitionLeaderEpoch, sizeEstimate);
partitionLeaderEpoch, sizeEstimate)) {
for (SimpleRecord record : records)
builder.append(record);
return builder.build();
}
}
public static MemoryRecords withEndTransactionMarker(long producerId, short producerEpoch, EndTransactionMarker marker) {
return withEndTransactionMarker(0L, System.currentTimeMillis(), RecordBatch.NO_PARTITION_LEADER_EPOCH,

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.network;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
@ -32,6 +31,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.MockedConstruction;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@ -66,9 +66,13 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -424,21 +428,27 @@ public class SelectorTest {
@Test
public void registerFailure() throws Exception {
ChannelBuilder channelBuilder = new PlaintextChannelBuilder(null) {
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
throw new RuntimeException("Test exception");
}
};
final String channelId = "1";
final ChannelBuilder channelBuilder = mock(ChannelBuilder.class);
when(channelBuilder.buildChannel(eq(channelId), any(SelectionKey.class), anyInt(), any(MemoryPool.class),
any(ChannelMetadataRegistry.class))).thenThrow(new RuntimeException("Test exception"));
try (MockedConstruction<Selector.SelectorChannelMetadataRegistry> mockedMetadataRegistry =
mockConstruction(Selector.SelectorChannelMetadataRegistry.class)) {
Selector selector = new Selector(CONNECTION_MAX_IDLE_MS, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
SocketChannel socketChannel = SocketChannel.open();
final SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
IOException e = assertThrows(IOException.class, () -> selector.register("1", socketChannel));
IOException e = assertThrows(IOException.class, () -> selector.register(channelId, socketChannel));
assertTrue(e.getCause().getMessage().contains("Test exception"), "Unexpected exception: " + e);
assertFalse(socketChannel.isOpen(), "Socket not closed");
// Ideally, metadataRegistry is closed by the KafkaChannel but if the KafkaChannel is not created due to
// an error such as in a case like this, the Selector should be closing the metadataRegistry instead.
verify(mockedMetadataRegistry.constructed().get(0)).close();
selector.close();
}
}
@Test
public void testCloseOldestConnection() throws Exception {

View File

@ -181,12 +181,15 @@ public class ReplicaManagerBuilder {
public ReplicaManager build() {
if (config == null) config = new KafkaConfig(Collections.emptyMap());
if (metrics == null) metrics = new Metrics();
if (logManager == null) throw new RuntimeException("You must set logManager");
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");
if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
// Initialize metrics in the end just before passing it to ReplicaManager to ensure ReplicaManager closes the
// metrics correctly. There might be a resource leak if it is initialized and an exception occurs between
// its initialization and creation of ReplicaManager.
if (metrics == null) metrics = new Metrics();
return new ReplicaManager(config,
metrics,
time,

View File

@ -48,7 +48,7 @@ import static java.util.Collections.singleton;
/**
* This class implements a read-process-write application.
*/
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener {
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener, AutoCloseable {
private final String bootstrapServers;
private final String inputTopic;
private final String outputTopic;
@ -214,4 +214,15 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal
}
}).sum();
}
@Override
public void close() throws Exception {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.errors.ProcessorStateException;
@ -167,7 +168,6 @@ public class KafkaStreams implements AutoCloseable {
private final Metrics metrics;
protected final StreamsConfig applicationConfigs;
protected final List<StreamThread> threads;
protected final StateDirectory stateDirectory;
protected final StreamsMetadataState streamsMetadataState;
private final ScheduledExecutorService stateDirCleaner;
private final ScheduledExecutorService rocksDBMetricsRecordingService;
@ -184,6 +184,7 @@ public class KafkaStreams implements AutoCloseable {
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
private KafkaStreams.StateListener stateListener;
private boolean oldHandler;
private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
@ -974,6 +975,7 @@ public class KafkaStreams implements AutoCloseable {
stateDirectory = new StateDirectory(applicationConfigs, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies());
processId = stateDirectory.initializeProcessId();
} catch (final ProcessorStateException fatal) {
Utils.closeQuietly(stateDirectory, "streams state directory");
throw new StreamsException(fatal);
}

View File

@ -60,7 +60,7 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.pars
* stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not
* thread-safe.
*/
public class StateDirectory {
public class StateDirectory implements AutoCloseable {
private static final Pattern TASK_DIR_PATH_NAME = Pattern.compile("\\d+_\\d+");
private static final Pattern NAMED_TOPOLOGY_DIR_PATH_NAME = Pattern.compile("__.+__"); // named topology dirs follow '__Topology-Name__'
@ -377,6 +377,7 @@ public class StateDirectory {
}
}
@Override
public void close() {
if (hasPersistentStores) {
try {

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
@ -394,10 +395,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
consistencyEnabled,
position
);
WriteBatch batch = null;
try {
final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
segment.addToBatch(new KeyValue<>(record.key(), record.value()), batch);
} catch (final RocksDBException e) {
Utils.closeQuietly(batch, "rocksdb write batch");
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
}
}