From def5f16c338fa06f295466dfb19744f9acadcc06 Mon Sep 17 00:00:00 2001 From: Chirag Wadhwa Date: Fri, 22 Aug 2025 13:53:12 +0530 Subject: [PATCH] KAFKA-19630: Reordered OR operands in archiveRecords method for SharePartiton (#20391) As per the current implementation in archiveRecords, when LSO is updated, if we have multiple record batches before the new LSO, then only the first one gets archived. This is because of the following lines of code -> `isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState);` `isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState);` The first record / batch will make `isAnyOffsetArchived` / `isAnyBatchArchived` true, after which this line of code will short-circuit and the methods `archivePerOffsetBatchRecords` / `archiveCompleteBatch` will not be called again. This PR changes the order of the expressions so that the short-circuit does not prevent from archiving all the required batches. Reviewers: Apoorv Mittal --- .../kafka/server/share/SharePartition.java | 4 +- .../server/share/SharePartitionTest.java | 116 ++++++++++++++++++ 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 68ced23a5f7..a07f9a12fbb 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1202,11 +1202,11 @@ public class SharePartition { } inFlightBatch.maybeInitializeOffsetStateUpdate(); } - isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState); + isAnyOffsetArchived = archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState) || isAnyOffsetArchived; continue; } // The in-flight batch is a full match hence change the state of the complete batch. - isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState); + isAnyBatchArchived = archiveCompleteBatch(inFlightBatch, initialState) || isAnyBatchArchived; } return isAnyOffsetArchived || isAnyBatchArchived; } finally { diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 30b49e17b16..ba24f3b2595 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -4493,6 +4493,122 @@ public class SharePartitionTest { assertNotNull(sharePartition.cachedState().get(32L).batchAcquisitionLockTimeoutTask()); } + @Test + public void testLsoMovementForArchivingAllAvailableBatches() { + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + + // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. + fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + + // After the acknowledgements, the state of share partition will be: + // 1. 11 -> 20: AVAILABLE + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: AVAILABLE + // 4. 41 -> 50: ACQUIRED + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), + new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)) + )); + + // Move the LSO to 41. When the LSO moves ahead, all batches that are AVAILABLE before the new LSO will be ARCHIVED. + // Thus, the state of the share partition will be: + // 1. 11 -> 20: ARCHIVED + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: ARCHIVED + // 4. 41 -> 50: ACQUIRED + // Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal + // state when the corresponding acquisition lock timer task expires. + sharePartition.updateCacheAndOffsets(41); + + assertEquals(51, sharePartition.nextFetchOffset()); + assertEquals(41, sharePartition.startOffset()); + assertEquals(50, sharePartition.endOffset()); + + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState()); + + // The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these + // records will remain in the ACQUIRED state. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))); + + // The batch is still in ACQUIRED state. + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + + // Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be + // ARCHIVED. + sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run(); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + } + + @Test + public void testLsoMovementForArchivingAllAvailableOffsets() { + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + + // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. + fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + + // After the acknowledgements, the share partition state will be: + // 1. 11 -> 20: AVAILABLE + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: AVAILABLE + // 4. 41 -> 50: ACQUIRED + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), + new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)) + )); + + // Move the LSO to 36. When the LSO moves ahead, all records that are AVAILABLE before the new LSO will be ARCHIVED. + // Thus, the state of the share partition will be: + // 1. 11 -> 20: ARCHIVED + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 35: ARCHIVED + // 3. 36 -> 40: AVAILABLE + // 4. 41 -> 50: ACQUIRED + // Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal + // state when the corresponding acquisition lock timer task expires. + sharePartition.updateCacheAndOffsets(36); + + assertEquals(36, sharePartition.nextFetchOffset()); + assertEquals(36, sharePartition.startOffset()); + assertEquals(50, sharePartition.endOffset()); + + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(31L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(32L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(33L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(34L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(35L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(36L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(37L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(38L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(39L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(40L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState()); + + // The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these + // records will remain in the ACQUIRED state. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))); + + // The batch is still in ACQUIRED state. + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + + // Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be + // ARCHIVED. + sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run(); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + } + @Test public void testLsoMovementForArchivingOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();