diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index c905a4678a9..f70ee032346 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -292,17 +292,43 @@ public class FileRecords extends AbstractRecords implements Closeable { /** * Search forward for the file position of the message batch whose last offset that is greater * than or equal to the target offset. If no such batch is found, return null. + *

+ * The following logic is intentionally designed to minimize memory usage + * by avoiding unnecessary calls to {@link FileChannelRecordBatch#lastOffset()} for every batch. + * Instead, we use {@link FileChannelRecordBatch#baseOffset()} comparisons when possible, and only + * check {@link FileChannelRecordBatch#lastOffset()} when absolutely necessary. * * @param targetOffset The offset to search for. * @param startingPosition The starting position in the file to begin searching from. * @return the batch's base offset, its physical position, and its size (including log overhead) */ - public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { + public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, int startingPosition) { + FileChannelRecordBatch prevBatch = null; + for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { - long offset = batch.lastOffset(); - if (offset >= targetOffset) - return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes()); + // If baseOffset exactly equals targetOffset, return immediately + if (batch.baseOffset() == targetOffset) { + return LogOffsetPosition.fromBatch(batch); + } + + // If we find the first batch with baseOffset greater than targetOffset + if (batch.baseOffset() > targetOffset) { + // If the previous batch contains the target + if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) + return LogOffsetPosition.fromBatch(prevBatch); + else { + // If there's no previous batch or the previous batch doesn't contain the + // target, return the current batch + return LogOffsetPosition.fromBatch(batch); + } + } + prevBatch = batch; } + // Only one case would reach here: all batches have baseOffset less than targetOffset + // Check if the last batch contains the target + if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) + return LogOffsetPosition.fromBatch(prevBatch); + return null; } @@ -463,6 +489,10 @@ public class FileRecords extends AbstractRecords implements Closeable { public final long offset; public final int position; public final int size; + + public static LogOffsetPosition fromBatch(FileChannelRecordBatch batch) { + return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes()); + } public LogOffsetPosition(long offset, int position, int size) { this.offset = offset; diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 7574609ab99..a2e89d3f4c6 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import java.io.File; @@ -51,10 +53,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -248,25 +253,25 @@ public class FileRecordsTest { int message1Size = batches.get(0).sizeInBytes(); assertEquals(new FileRecords.LogOffsetPosition(0L, position, message1Size), - fileRecords.searchForOffsetWithSize(0, 0), + fileRecords.searchForOffsetFromPosition(0, 0), "Should be able to find the first message by its offset"); position += message1Size; int message2Size = batches.get(1).sizeInBytes(); assertEquals(new FileRecords.LogOffsetPosition(1L, position, message2Size), - fileRecords.searchForOffsetWithSize(1, 0), + fileRecords.searchForOffsetFromPosition(1, 0), "Should be able to find second message when starting from 0"); assertEquals(new FileRecords.LogOffsetPosition(1L, position, message2Size), - fileRecords.searchForOffsetWithSize(1, position), + fileRecords.searchForOffsetFromPosition(1, position), "Should be able to find second message starting from its offset"); position += message2Size + batches.get(2).sizeInBytes(); int message4Size = batches.get(3).sizeInBytes(); assertEquals(new FileRecords.LogOffsetPosition(50L, position, message4Size), - fileRecords.searchForOffsetWithSize(3, position), + fileRecords.searchForOffsetFromPosition(3, position), "Should be able to find fourth message from a non-existent offset"); assertEquals(new FileRecords.LogOffsetPosition(50L, position, message4Size), - fileRecords.searchForOffsetWithSize(50, position), + fileRecords.searchForOffsetFromPosition(50, position), "Should be able to find fourth message by correct offset"); } @@ -276,7 +281,7 @@ public class FileRecordsTest { @Test public void testIteratorWithLimits() throws IOException { RecordBatch batch = batches(fileRecords).get(1); - int start = fileRecords.searchForOffsetWithSize(1, 0).position; + int start = fileRecords.searchForOffsetFromPosition(1, 0).position; int size = batch.sizeInBytes(); FileRecords slice = fileRecords.slice(start, size); assertEquals(Collections.singletonList(batch), batches(slice)); @@ -290,7 +295,7 @@ public class FileRecordsTest { @Test public void testTruncate() throws IOException { RecordBatch batch = batches(fileRecords).get(0); - int end = fileRecords.searchForOffsetWithSize(1, 0).position; + int end = fileRecords.searchForOffsetFromPosition(1, 0).position; fileRecords.truncateTo(end); assertEquals(Collections.singletonList(batch), batches(fileRecords)); assertEquals(batch.sizeInBytes(), fileRecords.sizeInBytes()); @@ -518,6 +523,194 @@ public class FileRecordsTest { verify(channel).transferFrom(any(), anyLong(), eq((long) size - firstWritten)); } + /** + * Test two conditions: + * 1. If the target offset equals the base offset of the first batch + * 2. If the target offset is less than the base offset of the first batch + *

+ * If the base offset of the first batch is equal to or greater than the target offset, it should return the + * position of the first batch and the lastOffset method should not be called. + */ + @ParameterizedTest + @ValueSource(longs = {5, 10}) + public void testSearchForOffsetFromPosition1(long baseOffset) throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch.baseOffset()).thenReturn(baseOffset); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result); + verify(batch, never()).lastOffset(); + } + + /** + * Test the case when the target offset equals the last offset of the first batch. + */ + @Test + public void testSearchForOffsetFromPosition2() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch.baseOffset()).thenReturn(3L); + when(batch.lastOffset()).thenReturn(5L); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result); + // target is equal to the last offset of the batch, we should call lastOffset + verify(batch, times(1)).lastOffset(); + } + + /** + * Test the case when the target offset equals the last offset of the last batch. + */ + @Test + public void testSearchForOffsetFromPosition3() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch prevBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(prevBatch.baseOffset()).thenReturn(5L); + when(prevBatch.lastOffset()).thenReturn(12L); + FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(currentBatch.baseOffset()).thenReturn(15L); + when(currentBatch.lastOffset()).thenReturn(20L); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, prevBatch, currentBatch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(20L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(currentBatch), result); + // Because the target offset is in the current batch, we should not call lastOffset in the previous batch + verify(prevBatch, never()).lastOffset(); + verify(currentBatch, times(1)).lastOffset(); + } + + /** + * Test the case when the target offset is within the range of the previous batch. + */ + @Test + public void testSearchForOffsetFromPosition4() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch prevBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(prevBatch.baseOffset()).thenReturn(5L); + when(prevBatch.lastOffset()).thenReturn(12L); // > targetOffset + FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, prevBatch, currentBatch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), result); + // Because the target offset is in the current batch, we should call lastOffset + // on the previous batch + verify(prevBatch, times(1)).lastOffset(); + } + + /** + * Test the case when no batch matches the target offset. + */ + @Test + public void testSearchForOffsetFromPosition5() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch1.baseOffset()).thenReturn(5L); // < targetOffset + FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch2.baseOffset()).thenReturn(8L); // < targetOffset + when(batch2.lastOffset()).thenReturn(9L); // < targetOffset + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch1, batch2); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0); + + assertNull(result); + // Because the target offset is exceeded by the last offset of the batch2, + // we should call lastOffset on the batch2 + verify(batch1, never()).lastOffset(); + verify(batch2, times(1)).lastOffset(); + } + + /** + * Test two conditions: + * 1. If the target offset is less than the base offset of the last batch + * 2. If the target offset equals the base offset of the last batch + */ + @ParameterizedTest + @ValueSource(longs = {8, 10}) + public void testSearchForOffsetFromPosition6(long baseOffset) throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch1.baseOffset()).thenReturn(5L); // < targetOffset + FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset or == targetOffset + when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch1, batch2); + + long targetOffset = 10L; + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(targetOffset, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result); + if (targetOffset == baseOffset) { + // Because the target offset is equal to the base offset of the batch2, we should not call + // lastOffset on batch2 and batch1 + verify(batch1, never()).lastOffset(); + verify(batch2, never()).lastOffset(); + } else { + // Because the target offset is in the batch2, we should not call + // lastOffset on batch1 + verify(batch1, never()).lastOffset(); + verify(batch2, times(1)).lastOffset(); + } + } + + /** + * Test the case when the target offset is between two batches. + */ + @Test + public void testSearchForOffsetFromPosition7() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch1.baseOffset()).thenReturn(5L); + when(batch1.lastOffset()).thenReturn(10L); + FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch2.baseOffset()).thenReturn(15L); + when(batch2.lastOffset()).thenReturn(20L); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch1, batch2); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(13L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result); + // Because the target offset is between the two batches, we should call lastOffset on the batch1 + verify(batch1, times(1)).lastOffset(); + verify(batch2, never()).lastOffset(); + } + + private void mockFileRecordBatches(FileRecords fileRecords, FileLogInputStream.FileChannelRecordBatch... batch) { + List batches = asList(batch); + doReturn((Iterable) batches::iterator) + .when(fileRecords) + .batchesFrom(anyInt()); + } + private void doTestConversion(Compression compression, byte toMagic) throws IOException { List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 2710b59d2b6..6f11b796e35 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -381,7 +381,7 @@ public class LogSegment implements Closeable { */ LogOffsetPosition translateOffset(long offset, int startingFilePosition) throws IOException { OffsetPosition mapping = offsetIndex().lookup(offset); - return log.searchForOffsetWithSize(offset, Math.max(mapping.position, startingFilePosition)); + return log.searchForOffsetFromPosition(offset, Math.max(mapping.position, startingFilePosition)); } /** diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index 9baa9ff394e..9ad98cb577e 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -600,7 +600,7 @@ public class LogSegmentTest { int offsetToBeginCorruption = TestUtils.RANDOM.nextInt(messagesAppended); // start corrupting somewhere in the middle of the chosen record all the way to the end - FileRecords.LogOffsetPosition recordPosition = seg.log().searchForOffsetWithSize(offsetToBeginCorruption, 0); + FileRecords.LogOffsetPosition recordPosition = seg.log().searchForOffsetFromPosition(offsetToBeginCorruption, 0); int position = recordPosition.position + TestUtils.RANDOM.nextInt(15); writeNonsenseToFile(seg.log().file(), position, (int) (seg.log().file().length() - position)); seg.recover(newProducerStateManager(), mock(LeaderEpochFileCache.class));