From c5d0ddd6f715d1b194446aa045e3fb22712aee50 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Wed, 27 Aug 2025 10:06:43 +0100 Subject: [PATCH] MINOR: Refactored gap window names in share partition (#20411) As per the suggestion by @adixitconfluent and @chirag-wadhwa5, [here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004), I have refactored the code with variable and method names. Reviewers: Andrew Schofield , Chirag Wadhwa --- .../kafka/server/share/SharePartition.java | 74 ++--- .../server/share/SharePartitionTest.java | 261 +++++++++--------- 2 files changed, 168 insertions(+), 167 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 08a9539dbed..8ed094b85f1 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -260,10 +260,10 @@ public class SharePartition { private long endOffset; /** - * The initial read gap offset tracks if there are any gaps in the in-flight batch during initial - * read of the share partition state from the persister. + * The persister read result gap window tracks if there are any gaps in the in-flight batch during + * initial read of the share partition state from the persister. */ - private InitialReadGapOffset initialReadGapOffset; + private GapWindow persisterReadResultGapWindow; /** * We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently. @@ -475,9 +475,9 @@ public class SharePartition { // in the cached state are not missed updateFindNextFetchOffset(true); endOffset = cachedState.lastEntry().getValue().lastOffset(); - // initialReadGapOffset is not required, if there are no gaps in the read state response + // gapWindow is not required, if there are no gaps in the read state response if (gapStartOffset != -1) { - initialReadGapOffset = new InitialReadGapOffset(endOffset, gapStartOffset); + persisterReadResultGapWindow = new GapWindow(endOffset, gapStartOffset); } // In case the persister read state RPC result contains no AVAILABLE records, we can update cached state // and start/end offsets. @@ -561,20 +561,20 @@ public class SharePartition { } long nextFetchOffset = -1; - long gapStartOffset = isInitialReadGapOffsetWindowActive() ? initialReadGapOffset.gapStartOffset() : -1; + long gapStartOffset = isPersisterReadGapWindowActive() ? persisterReadResultGapWindow.gapStartOffset() : -1; for (Map.Entry entry : cachedState.entrySet()) { // Check if there exists any gap in the in-flight batch which needs to be fetched. If - // initialReadGapOffset's endOffset is equal to the share partition's endOffset, then + // gapWindow's endOffset is equal to the share partition's endOffset, then // only the initial gaps should be considered. Once share partition's endOffset is past // initial read end offset then all gaps are anyway fetched. - if (isInitialReadGapOffsetWindowActive()) { + if (isPersisterReadGapWindowActive()) { if (entry.getKey() > gapStartOffset) { nextFetchOffset = gapStartOffset; break; } // If the gapStartOffset is already past the last offset of the in-flight batch, // then do not consider this batch for finding the next fetch offset. For example, - // consider during initialization, the initialReadGapOffset is set to 5 and the + // consider during initialization, the gapWindow is set to 5 and the // first cached batch is 15-18. First read will happen at offset 5 and say the data // fetched is [5-6], now next fetch offset should be 7. This works fine but say // subsequent read returns batch 8-11, and the gapStartOffset will be 12. Without @@ -769,10 +769,10 @@ public class SharePartition { } InFlightBatch inFlightBatch = entry.getValue(); - // If the initialReadGapOffset window is active, we need to treat the gaps in between the window as + // If the gapWindow window is active, we need to treat the gaps in between the window as // acquirable. Once the window is inactive (when we have acquired all the gaps inside the window), // the remaining gaps are natural (data does not exist at those offsets) and we need not acquire them. - if (isInitialReadGapOffsetWindowActive()) { + if (isPersisterReadGapWindowActive()) { // If nextBatchStartOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. // Thus, a new batch needs to be acquired for the gap. if (maybeGapStartOffset < entry.getKey()) { @@ -858,7 +858,7 @@ public class SharePartition { acquiredCount += shareAcquiredRecords.count(); } if (!result.isEmpty()) { - maybeUpdateReadGapFetchOffset(result.get(result.size() - 1).lastOffset() + 1); + maybeUpdatePersisterGapWindowStartOffset(result.get(result.size() - 1).lastOffset() + 1); return maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, isolationLevel, new ShareAcquiredRecords(result, acquiredCount)); } return new ShareAcquiredRecords(result, acquiredCount); @@ -1469,20 +1469,20 @@ public class SharePartition { } // Method to reduce the window that tracks gaps in the cachedState - private void maybeUpdateReadGapFetchOffset(long offset) { + private void maybeUpdatePersisterGapWindowStartOffset(long offset) { lock.writeLock().lock(); try { - if (initialReadGapOffset != null) { - // When last cached batch for initial read gap window is acquired, then endOffset is - // same as the initialReadGapOffset's endOffset, but the gap offset to update is - // endOffset + 1. Hence, do not update the gap start offset if the request offset + if (persisterReadResultGapWindow != null) { + // When last cached batch for persister's read gap window is acquired, then endOffset is + // same as the gapWindow's endOffset, but the gap offset to update in the method call + // is endOffset + 1. Hence, do not update the gap start offset if the request offset // is ahead of the endOffset. - if (initialReadGapOffset.endOffset() == endOffset && offset <= initialReadGapOffset.endOffset()) { - initialReadGapOffset.gapStartOffset(offset); + if (persisterReadResultGapWindow.endOffset() == endOffset && offset <= persisterReadResultGapWindow.endOffset()) { + persisterReadResultGapWindow.gapStartOffset(offset); } else { - // The initial read gap offset is not valid anymore as the end offset has moved - // beyond the initial read gap offset. Hence, reset the initial read gap offset. - initialReadGapOffset = null; + // The persister's read gap window is not valid anymore as the end offset has moved + // beyond the read gap window's endOffset. Hence, set the gap window to null. + persisterReadResultGapWindow = null; } } } finally { @@ -1570,14 +1570,14 @@ public class SharePartition { // batches align on batch boundaries. Hence, reset to last offset itself if the batch's // last offset is greater than the last offset for acquisition, else there could be // a situation where the batch overlaps with the initial read gap offset window batch. - // For example, if the initial read gap offset window is 10-30 i.e. initialReadGapOffset's + // For example, if the initial read gap offset window is 10-30 i.e. gapWindow's // startOffset is 10 and endOffset is 30, and the first persister's read batch is 15-30. // Say first fetched batch from log is 10-30 and maxFetchRecords is 1, then the lastOffset // in this method call would be 14. As the maxFetchRecords is lesser than the batch, // hence last batch offset for request offset is fetched. In this example it will // be 30, hence check if the initial read gap offset window is active and the last acquired // offset should be adjusted to 14 instead of 30. - if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) { + if (isPersisterReadGapWindowActive() && lastAcquiredOffset > lastOffset) { lastAcquiredOffset = lastOffset; } } @@ -1596,7 +1596,7 @@ public class SharePartition { if (lastAcquiredOffset > endOffset) { endOffset = lastAcquiredOffset; } - maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1); + maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1); return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1)); } finally { lock.writeLock().unlock(); @@ -2203,15 +2203,15 @@ public class SharePartition { // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); - if (isInitialReadGapOffsetWindowActive()) { + if (isPersisterReadGapWindowActive()) { // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. - // There is a gap from 21 to 30. Let the initialReadGapOffset.gapStartOffset be 21. In this case, + // There is a gap from 21 to 30. Let the gapWindow's gapStartOffset be 21. In this case, // lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset // of next cachedState batch (next cachedState batch is 31 to 40). There is an acquirable gap in between (21 to 30) - // and The startOffset should be at 21. Hence, we set startOffset to the minimum of initialReadGapOffset.gapStartOffset + // and The startOffset should be at 21. Hence, we set startOffset to the minimum of gapWindow.gapStartOffset // and higher key of lastOffsetAcknowledged - startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); + startOffset = Math.min(persisterReadResultGapWindow.gapStartOffset(), startOffset); } lastKeyToRemove = entry.getKey(); } else { @@ -2276,8 +2276,8 @@ public class SharePartition { return isRecordStateAcknowledged(startOffsetState); } - private boolean isInitialReadGapOffsetWindowActive() { - return initialReadGapOffset != null && initialReadGapOffset.endOffset() == endOffset; + private boolean isPersisterReadGapWindowActive() { + return persisterReadResultGapWindow != null && persisterReadResultGapWindow.endOffset() == endOffset; } /** @@ -2300,7 +2300,7 @@ public class SharePartition { for (NavigableMap.Entry entry : cachedState.entrySet()) { InFlightBatch inFlightBatch = entry.getValue(); - if (isInitialReadGapOffsetWindowActive() && inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) { + if (isPersisterReadGapWindowActive() && inFlightBatch.lastOffset() >= persisterReadResultGapWindow.gapStartOffset()) { return lastOffsetAcknowledged; } @@ -2865,8 +2865,8 @@ public class SharePartition { } // Visible for testing - InitialReadGapOffset initialReadGapOffset() { - return initialReadGapOffset; + GapWindow persisterReadResultGapWindow() { + return persisterReadResultGapWindow; } // Visible for testing. @@ -2875,17 +2875,17 @@ public class SharePartition { } /** - * The InitialReadGapOffset class is used to record the gap start and end offset of the probable gaps + * The GapWindow class is used to record the gap start and end offset of the probable gaps * of available records which are neither known to Persister nor to SharePartition. Share Partition * will use this information to determine the next fetch offset and should try to fetch the records * in the gap. */ // Visible for Testing - static class InitialReadGapOffset { + static class GapWindow { private final long endOffset; private long gapStartOffset; - InitialReadGapOffset(long endOffset, long gapStartOffset) { + GapWindow(long endOffset, long gapStartOffset) { this.endOffset = endOffset; this.gapStartOffset = gapStartOffset; } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 75f27ec5382..47e214a716f 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -17,6 +17,7 @@ package kafka.server.share; import kafka.server.ReplicaManager; +import kafka.server.share.SharePartition.GapWindow; import kafka.server.share.SharePartition.SharePartitionState; import kafka.server.share.SharePartitionManager.SharePartitionListener; @@ -965,11 +966,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.cachedState().get(21L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(21L).offsetState()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(10, initialReadGapOffset.gapStartOffset()); - assertEquals(30, initialReadGapOffset.endOffset()); + assertEquals(10, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(30, persisterReadResultGapWindow.endOffset()); } @Test @@ -1010,11 +1011,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(30L).offsetState()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(10, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(10, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -1051,11 +1052,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(30L).offsetState()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(21, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(21, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -1082,10 +1083,10 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(31, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); - // Since there are no gaps present in the readState response, initialReadGapOffset should be null - assertNull(initialReadGapOffset); + // Since there are no gaps present in the readState response, persisterReadResultGapWindow should be null + assertNull(persisterReadResultGapWindow); } @Test @@ -1118,9 +1119,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. // The records in the batch are from 10 to 49. @@ -1146,8 +1147,8 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(15L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(15L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); // Send the same batch again to acquire the next set of records. acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -1180,8 +1181,8 @@ public class SharePartitionTest { assertEquals(1, sharePartition.cachedState().get(23L).batchDeliveryCount()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); assertEquals(30L, sharePartition.endOffset()); - // As all the gaps are now filled, the initialReadGapOffset should be null. - assertNull(sharePartition.initialReadGapOffset()); + // As all the gaps are now filled, the persisterReadResultGapWindow should be null. + assertNull(sharePartition.persisterReadResultGapWindow()); // Now initial read gap is filled, so the complete batch can be acquired despite max fetch records being 1. acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -1234,9 +1235,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. // The records in the batch are from 10 to 49. @@ -1278,8 +1279,8 @@ public class SharePartitionTest { assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); assertEquals(49L, sharePartition.endOffset()); - // As all the gaps are now filled, the initialReadGapOffset should be null. - assertNull(sharePartition.initialReadGapOffset()); + // As all the gaps are now filled, the persisterReadResultGapWindow should be null. + assertNull(sharePartition.persisterReadResultGapWindow()); } @Test @@ -1312,9 +1313,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create a single batch record that ends in between the cached batch and the fetch offset is // post startOffset. @@ -1357,8 +1358,8 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); assertEquals(30L, sharePartition.endOffset()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(28L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(28L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); } @Test @@ -1391,9 +1392,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create a single batch record where first offset is prior startOffset. MemoryRecords records = memoryRecords(16, 6); @@ -1425,8 +1426,8 @@ public class SharePartitionTest { assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); assertEquals(30L, sharePartition.endOffset()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(20L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(20L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); } @Test @@ -1459,9 +1460,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(5L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(5L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create multiple batch records that covers the entire range from 5 to 30 of initial read gap. // The records in the batch are from 5 to 49. @@ -1496,8 +1497,8 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(7L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(7L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); // Remove first batch from the records as the fetch offset has moved forward to 7 offset. List batch = TestUtils.toList(records.batches()); @@ -1524,8 +1525,8 @@ public class SharePartitionTest { assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); assertEquals(30L, sharePartition.endOffset()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(12L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(12L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); // Remove the next 2 batches from the records as the fetch offset has moved forward to 12 offset. int size = batch.get(1).sizeInBytes() + batch.get(2).sizeInBytes(); @@ -1561,8 +1562,8 @@ public class SharePartitionTest { assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); assertEquals(30L, sharePartition.endOffset()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(26L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(26L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); // Remove the next 2 batches from the records as the fetch offset has moved forward to 26 offset. // Do not remove the 5th batch as it's only partially acquired. @@ -1590,8 +1591,8 @@ public class SharePartitionTest { assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); assertEquals(49L, sharePartition.endOffset()); - // As all the gaps are now filled, the initialReadGapOffset should be null. - assertNull(sharePartition.initialReadGapOffset()); + // As all the gaps are now filled, the persisterReadResultGapWindow should be null. + assertNull(sharePartition.persisterReadResultGapWindow()); } @Test @@ -1624,9 +1625,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(5L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(5L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create multiple batch records that ends in between the cached batch and the fetch offset is // post startOffset. @@ -1676,8 +1677,8 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); assertEquals(30L, sharePartition.endOffset()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(28L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(28L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); } @Test @@ -1710,9 +1711,9 @@ public class SharePartitionTest { assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); - assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(10L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); + assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create multiple batch records where multiple batches base offsets are prior startOffset. ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -1750,8 +1751,8 @@ public class SharePartitionTest { assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); assertEquals(30L, sharePartition.endOffset()); - assertNotNull(sharePartition.initialReadGapOffset()); - assertEquals(20L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertNotNull(sharePartition.persisterReadResultGapWindow()); + assertEquals(20L, sharePartition.persisterReadResultGapWindow().gapStartOffset()); } @Test @@ -3034,12 +3035,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(16, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(16, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(16, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3073,12 +3074,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(41, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(21, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(21, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3126,12 +3127,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(26, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(26, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(26, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3170,12 +3171,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(26, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(26, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(26, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3227,12 +3228,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(86, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(86, initialReadGapOffset.gapStartOffset()); - assertEquals(90, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(86, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(90, persisterReadResultGapWindow.endOffset()); } @Test @@ -3271,12 +3272,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(31, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(31, initialReadGapOffset.gapStartOffset()); - assertEquals(70, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(31, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(70, persisterReadResultGapWindow.endOffset()); } @Test @@ -3322,12 +3323,12 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(76, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // After records are acquired, the initialReadGapOffset should be updated - assertEquals(76, initialReadGapOffset.gapStartOffset()); - assertEquals(90, initialReadGapOffset.endOffset()); + // After records are acquired, the persisterReadResultGapWindow should be updated + assertEquals(76, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(90, persisterReadResultGapWindow.endOffset()); } @@ -3375,11 +3376,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(27, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(27, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(27, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3424,11 +3425,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(21, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(21, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(21, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3473,11 +3474,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(21, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(21, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(21, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -3525,8 +3526,8 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(51, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNull(persisterReadResultGapWindow); } @Test @@ -3569,8 +3570,8 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(61, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNull(persisterReadResultGapWindow); } @Test @@ -3615,8 +3616,8 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(61, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNull(persisterReadResultGapWindow); } @Test @@ -3664,8 +3665,8 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(61, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNull(persisterReadResultGapWindow); } @Test @@ -3705,11 +3706,11 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(41, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(31, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(31, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); // Fetching from the nextFetchOffset so that endOffset moves ahead records = memoryRecords(15, 41); @@ -3725,9 +3726,9 @@ public class SharePartitionTest { assertEquals(3, sharePartition.stateEpoch()); assertEquals(56, sharePartition.nextFetchOffset()); - // Since the endOffset is now moved ahead, the initialReadGapOffset should be empty - initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNull(initialReadGapOffset); + // Since the endOffset is now moved ahead, the persisterReadResultGapWindow should be empty + persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNull(persisterReadResultGapWindow); } @Test @@ -6782,11 +6783,11 @@ public class SharePartitionTest { assertEquals(40, sharePartition.endOffset()); assertEquals(21, sharePartition.nextFetchOffset()); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - assertEquals(21, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + assertEquals(21, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); } @Test @@ -7356,16 +7357,16 @@ public class SharePartitionTest { sharePartition.maybeInitialize(); - SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); - assertNotNull(initialReadGapOffset); + GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); + assertNotNull(persisterReadResultGapWindow); - // Since there is a gap in the beginning, the initialReadGapOffset window is same as the cachedState - assertEquals(11, initialReadGapOffset.gapStartOffset()); - assertEquals(40, initialReadGapOffset.endOffset()); + // Since there is a gap in the beginning, the persisterReadResultGapWindow window is same as the cachedState + assertEquals(11, persisterReadResultGapWindow.gapStartOffset()); + assertEquals(40, persisterReadResultGapWindow.endOffset()); long lastOffsetAcknowledged = sharePartition.findLastOffsetAcknowledged(); - // Since the initialReadGapOffset window begins at startOffset, we cannot count any of the offsets as acknowledged. + // Since the persisterReadResultGapWindow window begins at startOffset, we cannot count any of the offsets as acknowledged. // Thus, lastOffsetAcknowledged should be -1 assertEquals(-1, lastOffsetAcknowledged); }