From be80e3cb8a0561cf933aec3638cceea8de1ffc22 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 3 Apr 2025 15:18:05 +0800 Subject: [PATCH] KAFKA-18923: resource leak in RSM fetchIndex inputStream (#19111) Fix resource leak in RSM inputStream. Reviewers: Luke Chen --- core/src/main/java/kafka/server/TierStateMachine.java | 9 +++++---- .../log/remote/storage/LocalTieredStorageTest.java | 6 ++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java index 39d0b511865..d6a3c5c6f35 100644 --- a/core/src/main/java/kafka/server/TierStateMachine.java +++ b/core/src/main/java/kafka/server/TierStateMachine.java @@ -151,8 +151,8 @@ public class TierStateMachine { private List readLeaderEpochCheckpoint(RemoteLogManager rlm, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { - InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); - try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + try (InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); return readBuffer.read(); } @@ -166,8 +166,9 @@ public class TierStateMachine { File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); // Copy it to snapshot file in atomic manner. - Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), - tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING); + try (InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT)) { + Files.copy(inputStream, tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING); + } Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), false); // Reload producer snapshots. diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java index f1358b838cb..34566b9a12a 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java @@ -463,8 +463,7 @@ public final class LocalTieredStorageTest { * @param expected The expected content. */ public void verifyFetchedLogSegment(final RemoteLogSegmentId id, final int startPosition, final byte[] expected) { - try { - final InputStream in = remoteStorage.fetchLogSegment(newMetadata(id), startPosition); + try (final InputStream in = remoteStorage.fetchLogSegment(newMetadata(id), startPosition)) { final ByteBuffer buffer = ByteBuffer.wrap(readFully(in)); Iterator records = MemoryRecords.readableRecords(buffer).records().iterator(); @@ -529,8 +528,7 @@ public final class LocalTieredStorageTest { private void verifyFileContents(final Function actual, final RemoteLogSegmentId id, final byte[] expected) { - try { - final InputStream in = actual.apply(newMetadata(id)); + try (final InputStream in = actual.apply(newMetadata(id))) { assertArrayEquals(expected, readFully(in)); } catch (RemoteStorageException | IOException e) { throw new AssertionError(e);