MINOR: Refactored gap window names in share partition (#20411)

As per the suggestion by @adixitconfluent and @chirag-wadhwa5,

[here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004),
I have refactored the code with variable and method names.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chirag Wadhwa
<cwadhwa@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-08-27 10:06:43 +01:00 committed by GitHub
parent c797f85de4
commit c5d0ddd6f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 168 additions and 167 deletions

View File

@ -260,10 +260,10 @@ public class SharePartition {
private long endOffset;
/**
* The initial read gap offset tracks if there are any gaps in the in-flight batch during initial
* read of the share partition state from the persister.
* The persister read result gap window tracks if there are any gaps in the in-flight batch during
* initial read of the share partition state from the persister.
*/
private InitialReadGapOffset initialReadGapOffset;
private GapWindow persisterReadResultGapWindow;
/**
* We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently.
@ -475,9 +475,9 @@ public class SharePartition {
// in the cached state are not missed
updateFindNextFetchOffset(true);
endOffset = cachedState.lastEntry().getValue().lastOffset();
// initialReadGapOffset is not required, if there are no gaps in the read state response
// gapWindow is not required, if there are no gaps in the read state response
if (gapStartOffset != -1) {
initialReadGapOffset = new InitialReadGapOffset(endOffset, gapStartOffset);
persisterReadResultGapWindow = new GapWindow(endOffset, gapStartOffset);
}
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state
// and start/end offsets.
@ -561,20 +561,20 @@ public class SharePartition {
}
long nextFetchOffset = -1;
long gapStartOffset = isInitialReadGapOffsetWindowActive() ? initialReadGapOffset.gapStartOffset() : -1;
long gapStartOffset = isPersisterReadGapWindowActive() ? persisterReadResultGapWindow.gapStartOffset() : -1;
for (Map.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
// Check if there exists any gap in the in-flight batch which needs to be fetched. If
// initialReadGapOffset's endOffset is equal to the share partition's endOffset, then
// gapWindow's endOffset is equal to the share partition's endOffset, then
// only the initial gaps should be considered. Once share partition's endOffset is past
// initial read end offset then all gaps are anyway fetched.
if (isInitialReadGapOffsetWindowActive()) {
if (isPersisterReadGapWindowActive()) {
if (entry.getKey() > gapStartOffset) {
nextFetchOffset = gapStartOffset;
break;
}
// If the gapStartOffset is already past the last offset of the in-flight batch,
// then do not consider this batch for finding the next fetch offset. For example,
// consider during initialization, the initialReadGapOffset is set to 5 and the
// consider during initialization, the gapWindow is set to 5 and the
// first cached batch is 15-18. First read will happen at offset 5 and say the data
// fetched is [5-6], now next fetch offset should be 7. This works fine but say
// subsequent read returns batch 8-11, and the gapStartOffset will be 12. Without
@ -769,10 +769,10 @@ public class SharePartition {
}
InFlightBatch inFlightBatch = entry.getValue();
// If the initialReadGapOffset window is active, we need to treat the gaps in between the window as
// If the gapWindow window is active, we need to treat the gaps in between the window as
// acquirable. Once the window is inactive (when we have acquired all the gaps inside the window),
// the remaining gaps are natural (data does not exist at those offsets) and we need not acquire them.
if (isInitialReadGapOffsetWindowActive()) {
if (isPersisterReadGapWindowActive()) {
// If nextBatchStartOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState.
// Thus, a new batch needs to be acquired for the gap.
if (maybeGapStartOffset < entry.getKey()) {
@ -858,7 +858,7 @@ public class SharePartition {
acquiredCount += shareAcquiredRecords.count();
}
if (!result.isEmpty()) {
maybeUpdateReadGapFetchOffset(result.get(result.size() - 1).lastOffset() + 1);
maybeUpdatePersisterGapWindowStartOffset(result.get(result.size() - 1).lastOffset() + 1);
return maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, isolationLevel, new ShareAcquiredRecords(result, acquiredCount));
}
return new ShareAcquiredRecords(result, acquiredCount);
@ -1469,20 +1469,20 @@ public class SharePartition {
}
// Method to reduce the window that tracks gaps in the cachedState
private void maybeUpdateReadGapFetchOffset(long offset) {
private void maybeUpdatePersisterGapWindowStartOffset(long offset) {
lock.writeLock().lock();
try {
if (initialReadGapOffset != null) {
// When last cached batch for initial read gap window is acquired, then endOffset is
// same as the initialReadGapOffset's endOffset, but the gap offset to update is
// endOffset + 1. Hence, do not update the gap start offset if the request offset
if (persisterReadResultGapWindow != null) {
// When last cached batch for persister's read gap window is acquired, then endOffset is
// same as the gapWindow's endOffset, but the gap offset to update in the method call
// is endOffset + 1. Hence, do not update the gap start offset if the request offset
// is ahead of the endOffset.
if (initialReadGapOffset.endOffset() == endOffset && offset <= initialReadGapOffset.endOffset()) {
initialReadGapOffset.gapStartOffset(offset);
if (persisterReadResultGapWindow.endOffset() == endOffset && offset <= persisterReadResultGapWindow.endOffset()) {
persisterReadResultGapWindow.gapStartOffset(offset);
} else {
// The initial read gap offset is not valid anymore as the end offset has moved
// beyond the initial read gap offset. Hence, reset the initial read gap offset.
initialReadGapOffset = null;
// The persister's read gap window is not valid anymore as the end offset has moved
// beyond the read gap window's endOffset. Hence, set the gap window to null.
persisterReadResultGapWindow = null;
}
}
} finally {
@ -1570,14 +1570,14 @@ public class SharePartition {
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
// last offset is greater than the last offset for acquisition, else there could be
// a situation where the batch overlaps with the initial read gap offset window batch.
// For example, if the initial read gap offset window is 10-30 i.e. initialReadGapOffset's
// For example, if the initial read gap offset window is 10-30 i.e. gapWindow's
// startOffset is 10 and endOffset is 30, and the first persister's read batch is 15-30.
// Say first fetched batch from log is 10-30 and maxFetchRecords is 1, then the lastOffset
// in this method call would be 14. As the maxFetchRecords is lesser than the batch,
// hence last batch offset for request offset is fetched. In this example it will
// be 30, hence check if the initial read gap offset window is active and the last acquired
// offset should be adjusted to 14 instead of 30.
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) {
if (isPersisterReadGapWindowActive() && lastAcquiredOffset > lastOffset) {
lastAcquiredOffset = lastOffset;
}
}
@ -1596,7 +1596,7 @@ public class SharePartition {
if (lastAcquiredOffset > endOffset) {
endOffset = lastAcquiredOffset;
}
maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1);
maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
@ -2203,15 +2203,15 @@ public class SharePartition {
// If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
if (isInitialReadGapOffsetWindowActive()) {
if (isPersisterReadGapWindowActive()) {
// This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged.
// Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked.
// There is a gap from 21 to 30. Let the initialReadGapOffset.gapStartOffset be 21. In this case,
// There is a gap from 21 to 30. Let the gapWindow's gapStartOffset be 21. In this case,
// lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset
// of next cachedState batch (next cachedState batch is 31 to 40). There is an acquirable gap in between (21 to 30)
// and The startOffset should be at 21. Hence, we set startOffset to the minimum of initialReadGapOffset.gapStartOffset
// and The startOffset should be at 21. Hence, we set startOffset to the minimum of gapWindow.gapStartOffset
// and higher key of lastOffsetAcknowledged
startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
startOffset = Math.min(persisterReadResultGapWindow.gapStartOffset(), startOffset);
}
lastKeyToRemove = entry.getKey();
} else {
@ -2276,8 +2276,8 @@ public class SharePartition {
return isRecordStateAcknowledged(startOffsetState);
}
private boolean isInitialReadGapOffsetWindowActive() {
return initialReadGapOffset != null && initialReadGapOffset.endOffset() == endOffset;
private boolean isPersisterReadGapWindowActive() {
return persisterReadResultGapWindow != null && persisterReadResultGapWindow.endOffset() == endOffset;
}
/**
@ -2300,7 +2300,7 @@ public class SharePartition {
for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
if (isInitialReadGapOffsetWindowActive() && inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
if (isPersisterReadGapWindowActive() && inFlightBatch.lastOffset() >= persisterReadResultGapWindow.gapStartOffset()) {
return lastOffsetAcknowledged;
}
@ -2865,8 +2865,8 @@ public class SharePartition {
}
// Visible for testing
InitialReadGapOffset initialReadGapOffset() {
return initialReadGapOffset;
GapWindow persisterReadResultGapWindow() {
return persisterReadResultGapWindow;
}
// Visible for testing.
@ -2875,17 +2875,17 @@ public class SharePartition {
}
/**
* The InitialReadGapOffset class is used to record the gap start and end offset of the probable gaps
* The GapWindow class is used to record the gap start and end offset of the probable gaps
* of available records which are neither known to Persister nor to SharePartition. Share Partition
* will use this information to determine the next fetch offset and should try to fetch the records
* in the gap.
*/
// Visible for Testing
static class InitialReadGapOffset {
static class GapWindow {
private final long endOffset;
private long gapStartOffset;
InitialReadGapOffset(long endOffset, long gapStartOffset) {
GapWindow(long endOffset, long gapStartOffset) {
this.endOffset = endOffset;
this.gapStartOffset = gapStartOffset;
}

View File

@ -17,6 +17,7 @@
package kafka.server.share;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.GapWindow;
import kafka.server.share.SharePartition.SharePartitionState;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
@ -965,11 +966,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.cachedState().get(21L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(21L).offsetState());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(10, initialReadGapOffset.gapStartOffset());
assertEquals(30, initialReadGapOffset.endOffset());
assertEquals(10, persisterReadResultGapWindow.gapStartOffset());
assertEquals(30, persisterReadResultGapWindow.endOffset());
}
@Test
@ -1010,11 +1011,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(30L).offsetState());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(10, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(10, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -1051,11 +1052,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(30L).offsetState());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(21, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -1082,10 +1083,10 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(31, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
// Since there are no gaps present in the readState response, initialReadGapOffset should be null
assertNull(initialReadGapOffset);
// Since there are no gaps present in the readState response, persisterReadResultGapWindow should be null
assertNull(persisterReadResultGapWindow);
}
@Test
@ -1118,9 +1119,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record that covers the entire range from 10 to 30 of initial read gap.
// The records in the batch are from 10 to 49.
@ -1146,8 +1147,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(15L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(15L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Send the same batch again to acquire the next set of records.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@ -1180,8 +1181,8 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.cachedState().get(23L).batchDeliveryCount());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
// As all the gaps are now filled, the initialReadGapOffset should be null.
assertNull(sharePartition.initialReadGapOffset());
// As all the gaps are now filled, the persisterReadResultGapWindow should be null.
assertNull(sharePartition.persisterReadResultGapWindow());
// Now initial read gap is filled, so the complete batch can be acquired despite max fetch records being 1.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@ -1234,9 +1235,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record that covers the entire range from 10 to 30 of initial read gap.
// The records in the batch are from 10 to 49.
@ -1278,8 +1279,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState());
assertEquals(49L, sharePartition.endOffset());
// As all the gaps are now filled, the initialReadGapOffset should be null.
assertNull(sharePartition.initialReadGapOffset());
// As all the gaps are now filled, the persisterReadResultGapWindow should be null.
assertNull(sharePartition.persisterReadResultGapWindow());
}
@Test
@ -1312,9 +1313,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record that ends in between the cached batch and the fetch offset is
// post startOffset.
@ -1357,8 +1358,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(28L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(28L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@ -1391,9 +1392,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record where first offset is prior startOffset.
MemoryRecords records = memoryRecords(16, 6);
@ -1425,8 +1426,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(20L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(20L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@ -1459,9 +1460,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(5L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(5L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create multiple batch records that covers the entire range from 5 to 30 of initial read gap.
// The records in the batch are from 5 to 49.
@ -1496,8 +1497,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(7L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(7L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Remove first batch from the records as the fetch offset has moved forward to 7 offset.
List<RecordBatch> batch = TestUtils.toList(records.batches());
@ -1524,8 +1525,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(12L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(12L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Remove the next 2 batches from the records as the fetch offset has moved forward to 12 offset.
int size = batch.get(1).sizeInBytes() + batch.get(2).sizeInBytes();
@ -1561,8 +1562,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(26L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(26L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Remove the next 2 batches from the records as the fetch offset has moved forward to 26 offset.
// Do not remove the 5th batch as it's only partially acquired.
@ -1590,8 +1591,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState());
assertEquals(49L, sharePartition.endOffset());
// As all the gaps are now filled, the initialReadGapOffset should be null.
assertNull(sharePartition.initialReadGapOffset());
// As all the gaps are now filled, the persisterReadResultGapWindow should be null.
assertNull(sharePartition.persisterReadResultGapWindow());
}
@Test
@ -1624,9 +1625,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(5L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(5L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create multiple batch records that ends in between the cached batch and the fetch offset is
// post startOffset.
@ -1676,8 +1677,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(28L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(28L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@ -1710,9 +1711,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset());
assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset());
// Create multiple batch records where multiple batches base offsets are prior startOffset.
ByteBuffer buffer = ByteBuffer.allocate(4096);
@ -1750,8 +1751,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
assertEquals(20L, sharePartition.initialReadGapOffset().gapStartOffset());
assertNotNull(sharePartition.persisterReadResultGapWindow());
assertEquals(20L, sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@ -3034,12 +3035,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(16, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(16, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(16, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3073,12 +3074,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(41, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(21, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3126,12 +3127,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(26, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(26, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(26, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3170,12 +3171,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(26, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(26, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(26, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3227,12 +3228,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(86, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(86, initialReadGapOffset.gapStartOffset());
assertEquals(90, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(86, persisterReadResultGapWindow.gapStartOffset());
assertEquals(90, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3271,12 +3272,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(31, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(31, initialReadGapOffset.gapStartOffset());
assertEquals(70, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(31, persisterReadResultGapWindow.gapStartOffset());
assertEquals(70, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3322,12 +3323,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(76, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// After records are acquired, the initialReadGapOffset should be updated
assertEquals(76, initialReadGapOffset.gapStartOffset());
assertEquals(90, initialReadGapOffset.endOffset());
// After records are acquired, the persisterReadResultGapWindow should be updated
assertEquals(76, persisterReadResultGapWindow.gapStartOffset());
assertEquals(90, persisterReadResultGapWindow.endOffset());
}
@ -3375,11 +3376,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(27, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(27, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(27, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3424,11 +3425,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(21, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(21, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3473,11 +3474,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(21, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(21, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -3525,8 +3526,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(51, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNull(persisterReadResultGapWindow);
}
@Test
@ -3569,8 +3570,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(61, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNull(persisterReadResultGapWindow);
}
@Test
@ -3615,8 +3616,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(61, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNull(persisterReadResultGapWindow);
}
@Test
@ -3664,8 +3665,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(61, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNull(persisterReadResultGapWindow);
}
@Test
@ -3705,11 +3706,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(41, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(31, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(31, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
// Fetching from the nextFetchOffset so that endOffset moves ahead
records = memoryRecords(15, 41);
@ -3725,9 +3726,9 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(56, sharePartition.nextFetchOffset());
// Since the endOffset is now moved ahead, the initialReadGapOffset should be empty
initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNull(initialReadGapOffset);
// Since the endOffset is now moved ahead, the persisterReadResultGapWindow should be empty
persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNull(persisterReadResultGapWindow);
}
@Test
@ -6782,11 +6783,11 @@ public class SharePartitionTest {
assertEquals(40, sharePartition.endOffset());
assertEquals(21, sharePartition.nextFetchOffset());
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
assertEquals(21, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@ -7356,16 +7357,16 @@ public class SharePartitionTest {
sharePartition.maybeInitialize();
SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset();
assertNotNull(initialReadGapOffset);
GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow();
assertNotNull(persisterReadResultGapWindow);
// Since there is a gap in the beginning, the initialReadGapOffset window is same as the cachedState
assertEquals(11, initialReadGapOffset.gapStartOffset());
assertEquals(40, initialReadGapOffset.endOffset());
// Since there is a gap in the beginning, the persisterReadResultGapWindow window is same as the cachedState
assertEquals(11, persisterReadResultGapWindow.gapStartOffset());
assertEquals(40, persisterReadResultGapWindow.endOffset());
long lastOffsetAcknowledged = sharePartition.findLastOffsetAcknowledged();
// Since the initialReadGapOffset window begins at startOffset, we cannot count any of the offsets as acknowledged.
// Since the persisterReadResultGapWindow window begins at startOffset, we cannot count any of the offsets as acknowledged.
// Thus, lastOffsetAcknowledged should be -1
assertEquals(-1, lastOffsetAcknowledged);
}