KAFKA-18989 Optimize FileRecord#searchForOffsetWithSize (#19214)

The `lastOffset` includes the entire batch header, so we should check `baseOffset` instead.

To optimize this, we need to update the search logic. The previous
approach simply checked whether each batch's `lastOffset()` was greater
than or equal to the target offset. Once it found the first batch that
met this condition, it returned that batch immediately.

Now that we are using `baseOffset()`, we need to handle a special case:
if the `targetOffset` falls between the `lastOffset` of the previous
batch and the `baseOffset` of the matching batch, we should select the
matching batch. The updated logic is structured as follows:

1. First, if baseOffset exactly equals targetOffset, return immediately.
2. If we find the first batch with baseOffset greater than targetOffset
    - Check if the previous batch contains the target
- If there's no previous batch, return the current batch or the previous
batch doesn't contain the target, return the current batch
5. After iterating through all batches, check if the last batch contains
the target offset.

This code path is not thread-safe, so we need to prevent `EOFException`.
To avoid this exception, I am still using an early return. In this
scenario, `lastOffset` is still used within the loop, but it should be
executed at most once within the loop.

Therefore, in the new implementation, `lastOffset` will be executed at
most once. In most cases, this results in an optimization.

Test: Verifying Memory Usage Improvement  
To evaluate whether this optimization helps, I followed the steps below
to monitor memory usage:

1. Start a Standalone Kafka Server  
```sh
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.properties
```  

2. Use Performance Console Tools to Produce and Consume Records  

**Produce Records:**  
```sh
./kafka-producer-perf-test.sh \
  --topic test-topic \
  --num-records 1000000000 \
  --record-size 100 \
  --throughput -1 \
  --producer-props bootstrap.servers=localhost:9092
```  
**Consume Records:**  
```sh
./bin/kafka-consumer-perf-test.sh \
  --topic test-topic \
  --messages 1000000000 \
  --bootstrap-server localhost:9092
```  
It can be observed that memory usage has significantly decreased.
trunk:
![CleanShot 2025-03-16 at 11 53
31@2x](https://github.com/user-attachments/assets/eec26b1d-38ed-41c8-8c49-e5c68643761b)
this PR:
![CleanShot 2025-03-16 at 17 41
56@2x](https://github.com/user-attachments/assets/c8d4c234-18c2-4642-88ae-9f96cf54fccc)

Reviewers: Kirk True <kirk@kirktrue.pro>, TengYao Chi
<kitingiao@gmail.com>, David Arthur <mumrah@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-03-20 16:33:35 +08:00 committed by GitHub
parent e73719d962
commit 31e1a57c41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 236 additions and 13 deletions

View File

@ -292,17 +292,43 @@ public class FileRecords extends AbstractRecords implements Closeable {
/**
* Search forward for the file position of the message batch whose last offset that is greater
* than or equal to the target offset. If no such batch is found, return null.
* <p>
* The following logic is intentionally designed to minimize memory usage
* by avoiding unnecessary calls to {@link FileChannelRecordBatch#lastOffset()} for every batch.
* Instead, we use {@link FileChannelRecordBatch#baseOffset()} comparisons when possible, and only
* check {@link FileChannelRecordBatch#lastOffset()} when absolutely necessary.
*
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
* @return the batch's base offset, its physical position, and its size (including log overhead)
*/
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, int startingPosition) {
FileChannelRecordBatch prevBatch = null;
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes());
// If baseOffset exactly equals targetOffset, return immediately
if (batch.baseOffset() == targetOffset) {
return LogOffsetPosition.fromBatch(batch);
}
// If we find the first batch with baseOffset greater than targetOffset
if (batch.baseOffset() > targetOffset) {
// If the previous batch contains the target
if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
return LogOffsetPosition.fromBatch(prevBatch);
else {
// If there's no previous batch or the previous batch doesn't contain the
// target, return the current batch
return LogOffsetPosition.fromBatch(batch);
}
}
prevBatch = batch;
}
// Only one case would reach here: all batches have baseOffset less than targetOffset
// Check if the last batch contains the target
if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
return LogOffsetPosition.fromBatch(prevBatch);
return null;
}
@ -463,6 +489,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
public final long offset;
public final int position;
public final int size;
public static LogOffsetPosition fromBatch(FileChannelRecordBatch batch) {
return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes());
}
public LogOffsetPosition(long offset, int position, int size) {
this.offset = offset;

View File

@ -26,6 +26,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import java.io.File;
@ -51,10 +53,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -248,25 +253,25 @@ public class FileRecordsTest {
int message1Size = batches.get(0).sizeInBytes();
assertEquals(new FileRecords.LogOffsetPosition(0L, position, message1Size),
fileRecords.searchForOffsetWithSize(0, 0),
fileRecords.searchForOffsetFromPosition(0, 0),
"Should be able to find the first message by its offset");
position += message1Size;
int message2Size = batches.get(1).sizeInBytes();
assertEquals(new FileRecords.LogOffsetPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, 0),
fileRecords.searchForOffsetFromPosition(1, 0),
"Should be able to find second message when starting from 0");
assertEquals(new FileRecords.LogOffsetPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, position),
fileRecords.searchForOffsetFromPosition(1, position),
"Should be able to find second message starting from its offset");
position += message2Size + batches.get(2).sizeInBytes();
int message4Size = batches.get(3).sizeInBytes();
assertEquals(new FileRecords.LogOffsetPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(3, position),
fileRecords.searchForOffsetFromPosition(3, position),
"Should be able to find fourth message from a non-existent offset");
assertEquals(new FileRecords.LogOffsetPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(50, position),
fileRecords.searchForOffsetFromPosition(50, position),
"Should be able to find fourth message by correct offset");
}
@ -276,7 +281,7 @@ public class FileRecordsTest {
@Test
public void testIteratorWithLimits() throws IOException {
RecordBatch batch = batches(fileRecords).get(1);
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
int start = fileRecords.searchForOffsetFromPosition(1, 0).position;
int size = batch.sizeInBytes();
FileRecords slice = fileRecords.slice(start, size);
assertEquals(Collections.singletonList(batch), batches(slice));
@ -290,7 +295,7 @@ public class FileRecordsTest {
@Test
public void testTruncate() throws IOException {
RecordBatch batch = batches(fileRecords).get(0);
int end = fileRecords.searchForOffsetWithSize(1, 0).position;
int end = fileRecords.searchForOffsetFromPosition(1, 0).position;
fileRecords.truncateTo(end);
assertEquals(Collections.singletonList(batch), batches(fileRecords));
assertEquals(batch.sizeInBytes(), fileRecords.sizeInBytes());
@ -518,6 +523,194 @@ public class FileRecordsTest {
verify(channel).transferFrom(any(), anyLong(), eq((long) size - firstWritten));
}
/**
* Test two conditions:
* 1. If the target offset equals the base offset of the first batch
* 2. If the target offset is less than the base offset of the first batch
* <p>
* If the base offset of the first batch is equal to or greater than the target offset, it should return the
* position of the first batch and the lastOffset method should not be called.
*/
@ParameterizedTest
@ValueSource(longs = {5, 10})
public void testSearchForOffsetFromPosition1(long baseOffset) throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch.baseOffset()).thenReturn(baseOffset);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, batch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
verify(batch, never()).lastOffset();
}
/**
* Test the case when the target offset equals the last offset of the first batch.
*/
@Test
public void testSearchForOffsetFromPosition2() throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch.baseOffset()).thenReturn(3L);
when(batch.lastOffset()).thenReturn(5L);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, batch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
// target is equal to the last offset of the batch, we should call lastOffset
verify(batch, times(1)).lastOffset();
}
/**
* Test the case when the target offset equals the last offset of the last batch.
*/
@Test
public void testSearchForOffsetFromPosition3() throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch prevBatch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(prevBatch.baseOffset()).thenReturn(5L);
when(prevBatch.lastOffset()).thenReturn(12L);
FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(currentBatch.baseOffset()).thenReturn(15L);
when(currentBatch.lastOffset()).thenReturn(20L);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(20L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(currentBatch), result);
// Because the target offset is in the current batch, we should not call lastOffset in the previous batch
verify(prevBatch, never()).lastOffset();
verify(currentBatch, times(1)).lastOffset();
}
/**
* Test the case when the target offset is within the range of the previous batch.
*/
@Test
public void testSearchForOffsetFromPosition4() throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch prevBatch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(prevBatch.baseOffset()).thenReturn(5L);
when(prevBatch.lastOffset()).thenReturn(12L); // > targetOffset
FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), result);
// Because the target offset is in the current batch, we should call lastOffset
// on the previous batch
verify(prevBatch, times(1)).lastOffset();
}
/**
* Test the case when no batch matches the target offset.
*/
@Test
public void testSearchForOffsetFromPosition5() throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch1.baseOffset()).thenReturn(5L); // < targetOffset
FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch2.baseOffset()).thenReturn(8L); // < targetOffset
when(batch2.lastOffset()).thenReturn(9L); // < targetOffset
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, batch1, batch2);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0);
assertNull(result);
// Because the target offset is exceeded by the last offset of the batch2,
// we should call lastOffset on the batch2
verify(batch1, never()).lastOffset();
verify(batch2, times(1)).lastOffset();
}
/**
* Test two conditions:
* 1. If the target offset is less than the base offset of the last batch
* 2. If the target offset equals the base offset of the last batch
*/
@ParameterizedTest
@ValueSource(longs = {8, 10})
public void testSearchForOffsetFromPosition6(long baseOffset) throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch1.baseOffset()).thenReturn(5L); // < targetOffset
FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset or == targetOffset
when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, batch1, batch2);
long targetOffset = 10L;
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(targetOffset, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
if (targetOffset == baseOffset) {
// Because the target offset is equal to the base offset of the batch2, we should not call
// lastOffset on batch2 and batch1
verify(batch1, never()).lastOffset();
verify(batch2, never()).lastOffset();
} else {
// Because the target offset is in the batch2, we should not call
// lastOffset on batch1
verify(batch1, never()).lastOffset();
verify(batch2, times(1)).lastOffset();
}
}
/**
* Test the case when the target offset is between two batches.
*/
@Test
public void testSearchForOffsetFromPosition7() throws IOException {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch1.baseOffset()).thenReturn(5L);
when(batch1.lastOffset()).thenReturn(10L);
FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch2.baseOffset()).thenReturn(15L);
when(batch2.lastOffset()).thenReturn(20L);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false));
mockFileRecordBatches(fileRecords, batch1, batch2);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(13L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
// Because the target offset is between the two batches, we should call lastOffset on the batch1
verify(batch1, times(1)).lastOffset();
verify(batch2, never()).lastOffset();
}
private void mockFileRecordBatches(FileRecords fileRecords, FileLogInputStream.FileChannelRecordBatch... batch) {
List<FileLogInputStream.FileChannelRecordBatch> batches = asList(batch);
doReturn((Iterable<FileLogInputStream.FileChannelRecordBatch>) batches::iterator)
.when(fileRecords)
.batchesFrom(anyInt());
}
private void doTestConversion(Compression compression, byte toMagic) throws IOException {
List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);

View File

@ -381,7 +381,7 @@ public class LogSegment implements Closeable {
*/
LogOffsetPosition translateOffset(long offset, int startingFilePosition) throws IOException {
OffsetPosition mapping = offsetIndex().lookup(offset);
return log.searchForOffsetWithSize(offset, Math.max(mapping.position, startingFilePosition));
return log.searchForOffsetFromPosition(offset, Math.max(mapping.position, startingFilePosition));
}
/**

View File

@ -600,7 +600,7 @@ public class LogSegmentTest {
int offsetToBeginCorruption = TestUtils.RANDOM.nextInt(messagesAppended);
// start corrupting somewhere in the middle of the chosen record all the way to the end
FileRecords.LogOffsetPosition recordPosition = seg.log().searchForOffsetWithSize(offsetToBeginCorruption, 0);
FileRecords.LogOffsetPosition recordPosition = seg.log().searchForOffsetFromPosition(offsetToBeginCorruption, 0);
int position = recordPosition.position + TestUtils.RANDOM.nextInt(15);
writeNonsenseToFile(seg.log().file(), position, (int) (seg.log().file().length() - position));
seg.recover(newProducerStateManager(), mock(LeaderEpochFileCache.class));