KAFKA-19463: nextFetchOffset does not take ongoing state transition into account (#20080)
CI / build (push) Waiting to run Details

### About
`nextFetchOffset` function in `SharePartition` updates the fetch offsets
without considering batches/offsets which might be undergoing state
transition. This can cause problems in updating to the right fetch
offset.

### Testing
The new code added has been tested with the help of unit tests.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Abhinav Dixit 2025-07-02 22:39:43 +05:30 committed by GitHub
parent 42041f4772
commit 7cb370b786
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 176 additions and 2 deletions

View File

@ -641,14 +641,14 @@ public class SharePartition {
// Check if the state is maintained per offset or batch. If the offsetState
// is not maintained then the batch state is used to determine the offsets state.
if (entry.getValue().offsetState() == null) {
if (entry.getValue().batchState() == RecordState.AVAILABLE) {
if (entry.getValue().batchState() == RecordState.AVAILABLE && !entry.getValue().batchHasOngoingStateTransition()) {
nextFetchOffset = entry.getValue().firstOffset();
break;
}
} else {
// The offset state is maintained hence find the next available offset.
for (Map.Entry<Long, InFlightState> offsetState : entry.getValue().offsetState().entrySet()) {
if (offsetState.getValue().state == RecordState.AVAILABLE) {
if (offsetState.getValue().state == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) {
nextFetchOffset = offsetState.getKey();
break;
}
@ -2088,6 +2088,9 @@ public class SharePartition {
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
if (state.state == RecordState.AVAILABLE) {
findNextFetchOffset.set(true);
}
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();

View File

@ -7296,6 +7296,177 @@ public class SharePartitionTest {
assertNull(sharePartition.fetchLock()); // Fetch lock has been released.
}
@Test
public void testAcquireWhenBatchHasOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
// Acquire a single batch with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 10
);
// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
// Return a future which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id))));
// Assert the start offset has not moved and batch has ongoing transition.
assertEquals(21L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId());
// Acquire the same batch with member-2. This function call will return with 0 records since there is an ongoing
// transition for this batch.
fetchAcquiredRecords(
sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 0
);
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId());
// Complete the future so acknowledge API can be completed, which updates the cache. Now the records can be acquired.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future.complete(writeShareGroupStateResult);
// Acquire the same batch with member-2. 10 records will be acquired.
fetchAcquiredRecords(
sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 10
);
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
assertEquals("member-2", sharePartition.cachedState().get(21L).batchMemberId());
}
@Test
public void testNextFetchOffsetWhenBatchHasOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
// Acquire a single batch 0-9 with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM
), 10
);
// Acquire a single batch 10-19 with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM
), 10
);
// Validate that there is no ongoing transition.
assertEquals(2, sharePartition.cachedState().size());
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());
// Return futures which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();
// Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for
// offsets 0-9 and 10-19 respectively.
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RELEASE.id))));
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(10, 19, List.of(AcknowledgeType.RELEASE.id))));
// Complete future2 so second acknowledge API can be completed, which updates the cache.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future2.complete(writeShareGroupStateResult);
// Offsets 0-9 will have ongoing state transition since future1 is not complete yet.
// Offsets 10-19 won't have ongoing state transition since future2 has been completed.
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
// nextFetchOffset should return 10 and not 0 because batch 0-9 is undergoing state transition.
assertEquals(10, sharePartition.nextFetchOffset());
}
@Test
public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
// Acquire a single batch 0-50 with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM
), 50
);
// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
// Return futures which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();
// Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for
// offsets 5-9 and 20-24 respectively.
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id))));
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id))));
// Complete future2 so second acknowledge API can be completed, which updates the cache.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future2.complete(writeShareGroupStateResult);
// Offsets 5-9 will have ongoing state transition since future1 is not complete yet.
// Offsets 20-24 won't have ongoing state transition since future2 has been completed.
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(5L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(6L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(7L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(8L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(9L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(20L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(21L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(22L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(23L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(24L).hasOngoingStateTransition());
// nextFetchOffset should return 20 and not 5 because offsets 5-9 is undergoing state transition.
assertEquals(20, sharePartition.nextFetchOffset());
}
/**
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
*/