diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 09385d6c48c..eb0fedbe8cd 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -2178,7 +2178,8 @@ public class SharePartition { } } - private boolean canMoveStartOffset() { + // Visible for testing. + boolean canMoveStartOffset() { // The Share Partition Start Offset may be moved after acknowledge request is complete. // The following conditions need to be met to move the startOffset: // 1. When the cachedState is not empty. @@ -2203,7 +2204,15 @@ public class SharePartition { "as there is an acquirable gap at the beginning. Cannot move the start offset.", startOffset, groupId, topicIdPartition); return false; } - RecordState startOffsetState = entry.getValue().offsetState == null ? + boolean isBatchState = entry.getValue().offsetState() == null; + boolean isOngoingTransition = isBatchState ? + entry.getValue().batchHasOngoingStateTransition() : + entry.getValue().offsetState().get(startOffset).hasOngoingStateTransition(); + if (isOngoingTransition) { + return false; + } + + RecordState startOffsetState = isBatchState ? entry.getValue().batchState() : entry.getValue().offsetState().get(startOffset).state(); return isRecordStateAcknowledged(startOffsetState); @@ -2238,13 +2247,13 @@ public class SharePartition { } if (inFlightBatch.offsetState() == null) { - if (!isRecordStateAcknowledged(inFlightBatch.batchState())) { + if (inFlightBatch.batchHasOngoingStateTransition() || !isRecordStateAcknowledged(inFlightBatch.batchState())) { return lastOffsetAcknowledged; } lastOffsetAcknowledged = inFlightBatch.lastOffset(); } else { for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) { - if (!isRecordStateAcknowledged(offsetState.getValue().state())) { + if (offsetState.getValue().hasOngoingStateTransition() || !isRecordStateAcknowledged(offsetState.getValue().state())) { return lastOffsetAcknowledged; } lastOffsetAcknowledged = offsetState.getKey(); @@ -2913,7 +2922,8 @@ public class SharePartition { return batchState; } - private boolean batchHasOngoingStateTransition() { + // Visible for testing. + boolean batchHasOngoingStateTransition() { return inFlightState().hasOngoingStateTransition(); } @@ -3034,7 +3044,8 @@ public class SharePartition { acquisitionLockTimeoutTask = null; } - private boolean hasOngoingStateTransition() { + // Visible for testing. + boolean hasOngoingStateTransition() { if (rollbackState == null) { // This case could occur when the batch/offset hasn't transitioned even once or the state transitions have // been committed. @@ -3067,6 +3078,7 @@ public class SharePartition { return this; } catch (IllegalStateException e) { log.error("Failed to update state of the records", e); + rollbackState = null; return null; } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 0cc7ad4d9a1..2c77e98d72a 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -6479,6 +6479,103 @@ public class SharePartitionTest { assertEquals(-1, lastOffsetAcknowledged); } + @Test + public void testCacheUpdateWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + // Acquire a single batch. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + // Return a future which will be completed later, so the batch state has ongoing transition. + CompletableFuture future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.ACCEPT.id)))); + + // Assert the start offset has not moved and batch has ongoing transition. + assertEquals(21L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + + // Validate that offset can't be moved because batch has ongoing transition. + assertFalse(sharePartition.canMoveStartOffset()); + assertEquals(-1, sharePartition.findLastOffsetAcknowledged()); + + // Complete the future so acknowledge API can be completed, which updates the cache. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future.complete(writeShareGroupStateResult); + + // Validate the cache has been updated. + assertEquals(31L, sharePartition.startOffset()); + assertTrue(sharePartition.cachedState().isEmpty()); + } + + @Test + public void testCacheUpdateWhenOffsetStateHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + // Acquire a single batch. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + assertNull(sharePartition.cachedState().get(21L).offsetState()); + // Return a future which will be completed later, so the batch state has ongoing transition. + CompletableFuture future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + // Acknowledge offsets to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 23, List.of(AcknowledgeType.ACCEPT.id)))); + + // Assert the start offset has not moved and offset state is now maintained. Offset state should + // have ongoing transition. + assertEquals(21L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(21L).offsetState()); + assertTrue(sharePartition.cachedState().get(21L).offsetState().get(21L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(21L).offsetState().get(22L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(21L).offsetState().get(23L).hasOngoingStateTransition()); + // Only 21, 22 and 23 offsets should have ongoing state transition as the acknowledge request + // contains 21-23 offsets. + assertFalse(sharePartition.cachedState().get(21L).offsetState().get(24L).hasOngoingStateTransition()); + + // Validate that offset can't be moved because batch has ongoing transition. + assertFalse(sharePartition.canMoveStartOffset()); + assertEquals(-1, sharePartition.findLastOffsetAcknowledged()); + + // Complete the future so acknowledge API can be completed, which updates the cache. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future.complete(writeShareGroupStateResult); + + // Validate the cache has been updated. + assertEquals(24L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(21L)); + } + /** * Test the case where the fetch batch has first record offset greater than the record batch start offset. * Such batches can exist for compacted topics.