From 7cb370b786173d6f58c8d9be3f8ff4448855b62e Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Wed, 2 Jul 2025 22:39:43 +0530 Subject: [PATCH] KAFKA-19463: nextFetchOffset does not take ongoing state transition into account (#20080) ### About `nextFetchOffset` function in `SharePartition` updates the fetch offsets without considering batches/offsets which might be undergoing state transition. This can cause problems in updating to the right fetch offset. ### Testing The new code added has been tested with the help of unit tests. Reviewers: Apoorv Mittal --- .../kafka/server/share/SharePartition.java | 7 +- .../server/share/SharePartitionTest.java | 171 ++++++++++++++++++ 2 files changed, 176 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 6e2abb1d306..8c4ac930d18 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -641,14 +641,14 @@ public class SharePartition { // Check if the state is maintained per offset or batch. If the offsetState // is not maintained then the batch state is used to determine the offsets state. if (entry.getValue().offsetState() == null) { - if (entry.getValue().batchState() == RecordState.AVAILABLE) { + if (entry.getValue().batchState() == RecordState.AVAILABLE && !entry.getValue().batchHasOngoingStateTransition()) { nextFetchOffset = entry.getValue().firstOffset(); break; } } else { // The offset state is maintained hence find the next available offset. for (Map.Entry offsetState : entry.getValue().offsetState().entrySet()) { - if (offsetState.getValue().state == RecordState.AVAILABLE) { + if (offsetState.getValue().state == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) { nextFetchOffset = offsetState.getKey(); break; } @@ -2088,6 +2088,9 @@ public class SharePartition { state.completeStateTransition(true); // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. state.cancelAndClearAcquisitionLockTimeoutTask(); + if (state.state == RecordState.AVAILABLE) { + findNextFetchOffset.set(true); + } }); // Update the cached state and start and end offsets after acknowledging/releasing the acquired records. cacheStateUpdated = maybeUpdateCachedStateAndOffsets(); diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 2c77e98d72a..53b1ee27919 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -7296,6 +7296,177 @@ public class SharePartitionTest { assertNull(sharePartition.fetchLock()); // Fetch lock has been released. } + @Test + public void testAcquireWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + // Acquire a single batch with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + // Return a future which will be completed later, so the batch state has ongoing transition. + CompletableFuture future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id)))); + + // Assert the start offset has not moved and batch has ongoing transition. + assertEquals(21L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId()); + + // Acquire the same batch with member-2. This function call will return with 0 records since there is an ongoing + // transition for this batch. + fetchAcquiredRecords( + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 0 + ); + + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId()); + + // Complete the future so acknowledge API can be completed, which updates the cache. Now the records can be acquired. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future.complete(writeShareGroupStateResult); + + // Acquire the same batch with member-2. 10 records will be acquired. + fetchAcquiredRecords( + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals("member-2", sharePartition.cachedState().get(21L).batchMemberId()); + } + + @Test + public void testNextFetchOffsetWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + // Acquire a single batch 0-9 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, + fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Acquire a single batch 10-19 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, + fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertEquals(2, sharePartition.cachedState().size()); + assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for + // offsets 0-9 and 10-19 respectively. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(10, 19, List.of(AcknowledgeType.RELEASE.id)))); + + // Complete future2 so second acknowledge API can be completed, which updates the cache. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future2.complete(writeShareGroupStateResult); + + // Offsets 0-9 will have ongoing state transition since future1 is not complete yet. + // Offsets 10-19 won't have ongoing state transition since future2 has been completed. + assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition()); + + // nextFetchOffset should return 10 and not 0 because batch 0-9 is undergoing state transition. + assertEquals(10, sharePartition.nextFetchOffset()); + } + + @Test + public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + // Acquire a single batch 0-50 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, + fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM + ), 50 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for + // offsets 5-9 and 20-24 respectively. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)))); + + // Complete future2 so second acknowledge API can be completed, which updates the cache. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future2.complete(writeShareGroupStateResult); + + // Offsets 5-9 will have ongoing state transition since future1 is not complete yet. + // Offsets 20-24 won't have ongoing state transition since future2 has been completed. + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(5L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(6L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(7L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(8L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(9L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(0L).offsetState().get(20L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(0L).offsetState().get(21L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(0L).offsetState().get(22L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(0L).offsetState().get(23L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(0L).offsetState().get(24L).hasOngoingStateTransition()); + + // nextFetchOffset should return 20 and not 5 because offsets 5-9 is undergoing state transition. + assertEquals(20, sharePartition.nextFetchOffset()); + } + /** * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT). */