KAFKA-19630: Reordered OR operands in archiveRecords method for SharePartiton (#20391)

As per the current implementation in archiveRecords, when LSO is
updated, if we have multiple record batches before the new LSO, then
only the first one gets archived. This is because of the following lines
of code ->

`isAnyOffsetArchived = isAnyOffsetArchived ||
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1,
initialState);`

`isAnyBatchArchived = isAnyBatchArchived ||
archiveCompleteBatch(inFlightBatch, initialState);`

The first record / batch will make `isAnyOffsetArchived` /
`isAnyBatchArchived` true, after which this line of code will
short-circuit and the methods `archivePerOffsetBatchRecords` /
`archiveCompleteBatch` will not be called again. This PR changes the
order of the expressions so that the short-circuit does not prevent from
archiving all the required batches.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Chirag Wadhwa 2025-08-22 13:53:12 +05:30 committed by GitHub
parent eeb6a0d981
commit def5f16c33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 118 additions and 2 deletions

View File

@ -1202,11 +1202,11 @@ public class SharePartition {
}
inFlightBatch.maybeInitializeOffsetStateUpdate();
}
isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState);
isAnyOffsetArchived = archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState) || isAnyOffsetArchived;
continue;
}
// The in-flight batch is a full match hence change the state of the complete batch.
isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState);
isAnyBatchArchived = archiveCompleteBatch(inFlightBatch, initialState) || isAnyBatchArchived;
}
return isAnyOffsetArchived || isAnyBatchArchived;
} finally {

View File

@ -4493,6 +4493,122 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(32L).batchAcquisitionLockTimeoutTask());
}
@Test
public void testLsoMovementForArchivingAllAvailableBatches() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
// A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50.
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10);
// After the acknowledgements, the state of share partition will be:
// 1. 11 -> 20: AVAILABLE
// 2. 21 -> 30: ACQUIRED
// 3. 31 -> 40: AVAILABLE
// 4. 41 -> 50: ACQUIRED
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
new ShareAcknowledgementBatch(31, 40, List.of((byte) 2))
));
// Move the LSO to 41. When the LSO moves ahead, all batches that are AVAILABLE before the new LSO will be ARCHIVED.
// Thus, the state of the share partition will be:
// 1. 11 -> 20: ARCHIVED
// 2. 21 -> 30: ACQUIRED
// 3. 31 -> 40: ARCHIVED
// 4. 41 -> 50: ACQUIRED
// Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal
// state when the corresponding acquisition lock timer task expires.
sharePartition.updateCacheAndOffsets(41);
assertEquals(51, sharePartition.nextFetchOffset());
assertEquals(41, sharePartition.startOffset());
assertEquals(50, sharePartition.endOffset());
assertEquals(4, sharePartition.cachedState().size());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState());
// The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these
// records will remain in the ACQUIRED state.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))));
// The batch is still in ACQUIRED state.
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
// Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be
// ARCHIVED.
sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run();
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState());
}
@Test
public void testLsoMovementForArchivingAllAvailableOffsets() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
// A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50.
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10);
// After the acknowledgements, the share partition state will be:
// 1. 11 -> 20: AVAILABLE
// 2. 21 -> 30: ACQUIRED
// 3. 31 -> 40: AVAILABLE
// 4. 41 -> 50: ACQUIRED
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
new ShareAcknowledgementBatch(31, 40, List.of((byte) 2))
));
// Move the LSO to 36. When the LSO moves ahead, all records that are AVAILABLE before the new LSO will be ARCHIVED.
// Thus, the state of the share partition will be:
// 1. 11 -> 20: ARCHIVED
// 2. 21 -> 30: ACQUIRED
// 3. 31 -> 35: ARCHIVED
// 3. 36 -> 40: AVAILABLE
// 4. 41 -> 50: ACQUIRED
// Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal
// state when the corresponding acquisition lock timer task expires.
sharePartition.updateCacheAndOffsets(36);
assertEquals(36, sharePartition.nextFetchOffset());
assertEquals(36, sharePartition.startOffset());
assertEquals(50, sharePartition.endOffset());
assertEquals(4, sharePartition.cachedState().size());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(31L).state());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(32L).state());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(33L).state());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(34L).state());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(35L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(36L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(37L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(38L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(39L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(40L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState());
// The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these
// records will remain in the ACQUIRED state.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))));
// The batch is still in ACQUIRED state.
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
// Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be
// ARCHIVED.
sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run();
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState());
}
@Test
public void testLsoMovementForArchivingOffsets() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();