KAFKA-19436: Restrict cache update for ongoing batch/offset state (#20041) (#20647)
CI / build (push) Waiting to run Details

Cherry-pick commit from
https://github.com/apache/kafka/commit/96ef1c520a

In the stress testing it was noticed that on acquisition lock timeout,
some offsets were not found in the cache. The cache can be tried to be
updated in different acknowledgement calls hence if there is an ongoing
transition which is not yet finished but another parallel
acknowledgement triggers the cache update then the cache can be updated
incorrectly, while first transition is not yet finished.

Though the cache update happens for Archived and Acknowldeged records
hence this issue or existing implementation should not hamper the queues
functionality. But it might update the cache early when persister call
might fail or this issue triggers error logs with offset not found in
cache when acquisition lock timeouts (in some scenarios).

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
 <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-10-07 14:45:58 +01:00 committed by GitHub
parent ce248ab0d6
commit 978672b724
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 115 additions and 6 deletions

View File

@ -2178,7 +2178,8 @@ public class SharePartition {
}
}
private boolean canMoveStartOffset() {
// Visible for testing.
boolean canMoveStartOffset() {
// The Share Partition Start Offset may be moved after acknowledge request is complete.
// The following conditions need to be met to move the startOffset:
// 1. When the cachedState is not empty.
@ -2203,7 +2204,15 @@ public class SharePartition {
"as there is an acquirable gap at the beginning. Cannot move the start offset.", startOffset, groupId, topicIdPartition);
return false;
}
RecordState startOffsetState = entry.getValue().offsetState == null ?
boolean isBatchState = entry.getValue().offsetState() == null;
boolean isOngoingTransition = isBatchState ?
entry.getValue().batchHasOngoingStateTransition() :
entry.getValue().offsetState().get(startOffset).hasOngoingStateTransition();
if (isOngoingTransition) {
return false;
}
RecordState startOffsetState = isBatchState ?
entry.getValue().batchState() :
entry.getValue().offsetState().get(startOffset).state();
return isRecordStateAcknowledged(startOffsetState);
@ -2238,13 +2247,13 @@ public class SharePartition {
}
if (inFlightBatch.offsetState() == null) {
if (!isRecordStateAcknowledged(inFlightBatch.batchState())) {
if (inFlightBatch.batchHasOngoingStateTransition() || !isRecordStateAcknowledged(inFlightBatch.batchState())) {
return lastOffsetAcknowledged;
}
lastOffsetAcknowledged = inFlightBatch.lastOffset();
} else {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
if (!isRecordStateAcknowledged(offsetState.getValue().state())) {
if (offsetState.getValue().hasOngoingStateTransition() || !isRecordStateAcknowledged(offsetState.getValue().state())) {
return lastOffsetAcknowledged;
}
lastOffsetAcknowledged = offsetState.getKey();
@ -2913,7 +2922,8 @@ public class SharePartition {
return batchState;
}
private boolean batchHasOngoingStateTransition() {
// Visible for testing.
boolean batchHasOngoingStateTransition() {
return inFlightState().hasOngoingStateTransition();
}
@ -3034,7 +3044,8 @@ public class SharePartition {
acquisitionLockTimeoutTask = null;
}
private boolean hasOngoingStateTransition() {
// Visible for testing.
boolean hasOngoingStateTransition() {
if (rollbackState == null) {
// This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
// been committed.
@ -3067,6 +3078,7 @@ public class SharePartition {
return this;
} catch (IllegalStateException e) {
log.error("Failed to update state of the records", e);
rollbackState = null;
return null;
}
}

View File

@ -6479,6 +6479,103 @@ public class SharePartitionTest {
assertEquals(-1, lastOffsetAcknowledged);
}
@Test
public void testCacheUpdateWhenBatchHasOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
// Acquire a single batch.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 10
);
// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
// Return a future which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.ACCEPT.id))));
// Assert the start offset has not moved and batch has ongoing transition.
assertEquals(21L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
// Validate that offset can't be moved because batch has ongoing transition.
assertFalse(sharePartition.canMoveStartOffset());
assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
// Complete the future so acknowledge API can be completed, which updates the cache.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future.complete(writeShareGroupStateResult);
// Validate the cache has been updated.
assertEquals(31L, sharePartition.startOffset());
assertTrue(sharePartition.cachedState().isEmpty());
}
@Test
public void testCacheUpdateWhenOffsetStateHasOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
// Acquire a single batch.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 10
);
// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
assertNull(sharePartition.cachedState().get(21L).offsetState());
// Return a future which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
// Acknowledge offsets to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 23, List.of(AcknowledgeType.ACCEPT.id))));
// Assert the start offset has not moved and offset state is now maintained. Offset state should
// have ongoing transition.
assertEquals(21L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(21L).offsetState());
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(21L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(22L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(23L).hasOngoingStateTransition());
// Only 21, 22 and 23 offsets should have ongoing state transition as the acknowledge request
// contains 21-23 offsets.
assertFalse(sharePartition.cachedState().get(21L).offsetState().get(24L).hasOngoingStateTransition());
// Validate that offset can't be moved because batch has ongoing transition.
assertFalse(sharePartition.canMoveStartOffset());
assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
// Complete the future so acknowledge API can be completed, which updates the cache.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future.complete(writeShareGroupStateResult);
// Validate the cache has been updated.
assertEquals(24L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(21L));
}
/**
* Test the case where the fetch batch has first record offset greater than the record batch start offset.
* Such batches can exist for compacted topics.