KAFKA-18923: resource leak in RSM fetchIndex inputStream (#19111)

Fix resource leak in RSM inputStream.

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
PoAn Yang 2025-04-03 15:18:05 +08:00 committed by GitHub
parent 5c01fd0b76
commit be80e3cb8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 7 additions and 8 deletions

View File

@ -151,8 +151,8 @@ public class TierStateMachine {
private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
try (InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
return readBuffer.read();
}
@ -166,8 +166,9 @@ public class TierStateMachine {
File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp");
// Copy it to snapshot file in atomic manner.
Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING);
try (InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT)) {
Files.copy(inputStream, tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING);
}
Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), false);
// Reload producer snapshots.

View File

@ -463,8 +463,7 @@ public final class LocalTieredStorageTest {
* @param expected The expected content.
*/
public void verifyFetchedLogSegment(final RemoteLogSegmentId id, final int startPosition, final byte[] expected) {
try {
final InputStream in = remoteStorage.fetchLogSegment(newMetadata(id), startPosition);
try (final InputStream in = remoteStorage.fetchLogSegment(newMetadata(id), startPosition)) {
final ByteBuffer buffer = ByteBuffer.wrap(readFully(in));
Iterator<Record> records = MemoryRecords.readableRecords(buffer).records().iterator();
@ -529,8 +528,7 @@ public final class LocalTieredStorageTest {
private void verifyFileContents(final Function<RemoteLogSegmentMetadata, InputStream> actual,
final RemoteLogSegmentId id,
final byte[] expected) {
try {
final InputStream in = actual.apply(newMetadata(id));
try (final InputStream in = actual.apply(newMetadata(id))) {
assertArrayEquals(expected, readFully(in));
} catch (RemoteStorageException | IOException e) {
throw new AssertionError(e);