KAFKA-19476: Concurrent execution fixes for lock timeout and lso movement (#20286)
CI / build (push) Has been cancelled Details

The PR fixes following:

1. In case share partition arrive at a state which should be treated as
final state
of that batch/offset (example - LSO movement which causes offset/batch
to be ARCHIVED permanently), the result of pending write state RPCs for
that offset/batch override the ARCHIVED state. Hence track such updates
and apply when transition is completed.

2. If an acquisition lock timeout occurs while an offset/batch is
undergoing transition followed by write state RPC failure, then
respective batch/offset can
land in a scenario where the offset stays in ACQUIRED state with no
acquisition lock timeout task.

3. If a timer task is cancelled, but due to concurrent execution of
timer task and acknowledgement, there can be a scenario when timer task
has processed post cancellation. Hence it can mark the offset/batch
re-avaialble despite already acknowledged.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
 <adixit@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-08-01 23:20:25 +01:00 committed by GitHub
parent b909544e99
commit 05d71ad1a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 425 additions and 42 deletions

View File

@ -241,6 +241,12 @@ public class SharePartition {
*/
private final AcquisitionLockTimeoutHandler timeoutHandler;
/**
* The replica manager is used to check to see if any delayed share fetch request can be completed because of data
* availability due to acquisition lock timeout.
*/
private final ReplicaManager replicaManager;
/**
* The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition.
@ -295,12 +301,6 @@ public class SharePartition {
*/
private long fetchLockIdleDurationMs;
/**
* The replica manager is used to check to see if any delayed share fetch request can be completed because of data
* availability due to acquisition lock timeout.
*/
private final ReplicaManager replicaManager;
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
@ -1245,10 +1245,7 @@ public class SharePartition {
continue;
}
offsetState.getValue().archive(EMPTY_MEMBER_ID);
if (initialState == RecordState.ACQUIRED) {
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
}
offsetState.getValue().archive();
isAnyOffsetArchived = true;
}
return isAnyOffsetArchived;
@ -1263,10 +1260,7 @@ public class SharePartition {
log.trace("Archiving complete batch: {} for the share partition: {}-{}", inFlightBatch, groupId, topicIdPartition);
if (inFlightBatch.batchState() == initialState) {
// Change the state of complete batch since the same state exists for the entire inFlight batch.
inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
if (initialState == RecordState.ACQUIRED) {
inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
}
inFlightBatch.archiveBatch();
return true;
}
} finally {
@ -1799,6 +1793,12 @@ public class SharePartition {
if (throwable.isPresent()) {
return throwable;
}
if (inFlightBatch.batchHasOngoingStateTransition()) {
log.debug("The batch has on-going transition, batch: {} for the share "
+ "partition: {}-{}", inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException("The record state is invalid. The acknowledgement of delivery could not be completed."));
}
}
// Determine if the in-flight batch is a full match from the request batch.
@ -1899,7 +1899,15 @@ public class SharePartition {
+ " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId,
topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"The batch cannot be acknowledged. The offset is not acquired."));
"The offset cannot be acknowledged. The offset is not acquired."));
}
if (offsetState.getValue().hasOngoingStateTransition()) {
log.debug("The offset has on-going transition, offset: {} batch: {} for the share"
+ " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId,
topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"The record state is invalid. The acknowledgement of delivery could not be completed."));
}
// Check if member id is the owner of the offset.
@ -2044,7 +2052,12 @@ public class SharePartition {
// Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any changed state"
+ " for the share partition: {}-{}", groupId, topicIdPartition);
updatedStates.forEach(state -> state.completeStateTransition(false));
updatedStates.forEach(state -> {
state.completeStateTransition(false);
if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
future.completeExceptionally(throwable);
return;
}
@ -2067,7 +2080,14 @@ public class SharePartition {
if (exception != null) {
log.debug("Failed to write state to persister for the share partition: {}-{}",
groupId, topicIdPartition, exception);
updatedStates.forEach(state -> state.completeStateTransition(false));
// In case of failure when transition state is rolled back then it should be rolled
// back to ACQUIRED state, unless acquisition lock for the state has expired.
updatedStates.forEach(state -> {
state.completeStateTransition(false);
if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
future.completeExceptionally(exception);
return;
}
@ -2076,8 +2096,6 @@ public class SharePartition {
groupId, topicIdPartition);
updatedStates.forEach(state -> {
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
@ -2389,10 +2407,18 @@ public class SharePartition {
}
private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
return (memberId, firstOffset, lastOffset) -> {
return (memberId, firstOffset, lastOffset, timerTask) -> {
List<PersisterStateBatch> stateBatches;
lock.writeLock().lock();
try {
// Check if timer task is already cancelled. This can happen when concurrent requests
// happen to acknowledge in-flight state and timeout handler is waiting for the lock
// but already cancelled.
if (timerTask.isCancelled()) {
log.debug("Timer task is already cancelled, not executing further.");
return;
}
Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset);
if (floorOffset == null) {
log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition);

View File

@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DeliveryCountOps;
import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@ -71,6 +72,7 @@ import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
@ -88,6 +90,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -7573,6 +7576,308 @@ public class SharePartitionTest {
assertEquals(20, sharePartition.nextFetchOffset());
}
@Test
public void testAcquisitionLockTimeoutWithConcurrentAcknowledgement() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withPersister(persister)
.build();
// Create 2 batches of records.
ByteBuffer buffer = ByteBuffer.allocate(4096);
memoryRecordsBuilder(buffer, 5, 0).close();
memoryRecordsBuilder(buffer, 15, 5).close();
buffer.flip();
MemoryRecords records = MemoryRecords.readableRecords(buffer);
// Acquire 10 records.
fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
5, /* Batch size of 5 so cache can have 2 entries */
10,
DEFAULT_FETCH_OFFSET,
fetchPartitionData(records, 0),
FETCH_ISOLATION_HWM),
20);
assertEquals(2, sharePartition.cachedState().size());
assertEquals(2, sharePartition.timer().size());
// Return 2 future which will be completed later.
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
// Store the corresponding batch timer tasks.
TimerTask timerTask1 = sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask();
TimerTask timerTask2 = sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask();
// Acknowledge 1 offset in first batch as Accept to create offset tracking, accept complete
// sencond batch. And mark offset 0 as release so cached state do not move ahead.
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RELEASE.id)),
new ShareAcknowledgementBatch(1, 1, List.of(AcknowledgeType.ACCEPT.id)),
new ShareAcknowledgementBatch(5, 19, List.of(AcknowledgeType.ACCEPT.id))));
// Assert the start offset has not moved.
assertEquals(0L, sharePartition.startOffset());
assertEquals(2, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(0L).state());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState());
// Verify ongoing transition states.
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(0L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(1L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(2L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(5L).batchHasOngoingStateTransition());
// Validate first timer task is already cancelled.
assertTrue(timerTask1.isCancelled());
assertFalse(timerTask2.isCancelled());
// Fetch offset state timer tasks.
TimerTask timerTaskOffsetState1 = sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask();
TimerTask timerTaskOffsetState2 = sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask();
TimerTask timerTaskOffsetState3 = sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask();
// Complete futures.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future1.complete(writeShareGroupStateResult);
future2.complete(writeShareGroupStateResult);
// Verify timer tasks are now cancelled, except unacknowledged offsets.
assertEquals(2, sharePartition.cachedState().size());
assertTrue(timerTask2.isCancelled());
assertTrue(timerTaskOffsetState1.isCancelled());
assertTrue(timerTaskOffsetState2.isCancelled());
assertFalse(timerTaskOffsetState3.isCancelled());
// Verify the state prior executing the timer tasks.
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState());
// Running expired timer tasks should not mark offsets available, except for offset 2.
timerTask1.run();
// State should remain same.
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
timerTask2.run();
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState());
timerTaskOffsetState2.run();
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
// Should update the state to available as the timer task is not yet expired.
timerTaskOffsetState3.run();
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
}
@Test
public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
// Return futures which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();
// Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for
// offsets 2-6 and 7-11 respectively.
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id))));
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id))));
// Validate that there is no ongoing transition.
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
// Move LSO to 7, so some records/offsets can be marked archived for the first batch.
sharePartition.updateCacheAndOffsets(7L);
// Start offset will be moved.
assertEquals(12L, sharePartition.nextFetchOffset());
assertEquals(7L, sharePartition.startOffset());
assertEquals(11, sharePartition.endOffset());
assertEquals(2, sharePartition.cachedState().size());
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).batchState());
// Complete future1 exceptionally so acknowledgement for 2-6 offsets will be completed.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
future1.complete(writeShareGroupStateResult);
// The completion of future1 with exception should not impact the cached state since those records have already
// been archived.
assertEquals(12, sharePartition.nextFetchOffset());
assertEquals(7, sharePartition.startOffset());
assertEquals(11, sharePartition.endOffset());
assertEquals(2, sharePartition.cachedState().size());
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).batchState());
future2.complete(writeShareGroupStateResult);
assertEquals(12L, sharePartition.nextFetchOffset());
assertEquals(7, sharePartition.startOffset());
assertEquals(11, sharePartition.endOffset());
assertEquals(2, sharePartition.cachedState().size());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(7L).batchState());
}
@Test
public void inFlightStateRollbackAndArchiveStateTransition() throws InterruptedException {
InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID);
inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
assertTrue(inFlightState.hasOngoingStateTransition());
// We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same
// time when we have a call to completeStateTransition with false commit value, we get a call to ARCHIVE the record.
// No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED.
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
List<Callable<Void>> callables = List.of(
() -> {
inFlightState.archive();
return null;
},
() -> {
inFlightState.completeStateTransition(false);
return null;
}
);
executorService.invokeAll(callables);
} finally {
if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
executorService.shutdown();
}
assertEquals(RecordState.ARCHIVED, inFlightState.state());
assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
}
@Test
public void inFlightStateCommitSuccessAndArchiveStateTransition() throws InterruptedException {
InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID);
inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
assertTrue(inFlightState.hasOngoingStateTransition());
// We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same
// time when we have a call to completeStateTransition with true commit value, we get a call to ARCHIVE the record.
// No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED.
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
List<Callable<Void>> callables = List.of(
() -> {
inFlightState.archive();
return null;
},
() -> {
inFlightState.completeStateTransition(true);
return null;
}
);
executorService.invokeAll(callables);
} finally {
if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
executorService.shutdown();
}
assertEquals(RecordState.ARCHIVED, inFlightState.state());
assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
}
@Test
public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withPersister(persister)
.build();
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
fetchPartitionData(memoryRecords(2, 0)), FETCH_ISOLATION_HWM
), 2
);
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState());
// Return a future which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 1, List.of(AcknowledgeType.ACCEPT.id))));
// Assert the start offset has not moved and batch has ongoing transition.
assertEquals(0L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId());
// Timer task has not been expired yet.
assertFalse(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired());
// Allowing acquisition lock to expire. This will not cause any change because the record is not in ACQUIRED state.
// This will remove the entry of the timer task from timer.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.cachedState().get(0L).batchState() == RecordState.ACKNOWLEDGED &&
sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 &&
sharePartition.timer().size() == 0,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of())));
// Acquisition lock timeout task has run already and is not null.
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
// Timer task should be expired now.
assertTrue(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired());
// Complete future exceptionally so acknowledgement for 0-1 offsets will be completed.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
future.complete(writeShareGroupStateResult);
// Even though write state RPC has failed and corresponding acquisition lock timeout task has expired,
// the record should not stuck in ACQUIRED state with no acquisition lock timeout task.
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId());
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
}
/**
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
*/

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.server.util.timer.TimerTask;
/**
* AcquisitionLockTimeoutHandler is an interface that defines a handler for acquisition lock timeouts.
* It is used to handle cases where the acquisition lock for a share partition times out.
@ -29,6 +31,6 @@ public interface AcquisitionLockTimeoutHandler {
* @param firstOffset the first offset
* @param lastOffset the last offset
*/
void handle(String memberId, long firstOffset, long lastOffset);
void handle(String memberId, long firstOffset, long lastOffset, TimerTask timerTask);
}

View File

@ -32,6 +32,7 @@ public class AcquisitionLockTimerTask extends TimerTask {
private final long lastOffset;
private final AcquisitionLockTimeoutHandler timeoutHandler;
private final SharePartitionMetrics sharePartitionMetrics;
private volatile boolean hasExpired;
public AcquisitionLockTimerTask(
Time time,
@ -49,18 +50,28 @@ public class AcquisitionLockTimerTask extends TimerTask {
this.lastOffset = lastOffset;
this.timeoutHandler = timeoutHandler;
this.sharePartitionMetrics = sharePartitionMetrics;
this.hasExpired = false;
}
public long expirationMs() {
return expirationMs;
}
public boolean hasExpired() {
return hasExpired;
}
/**
* The task is executed when the acquisition lock timeout is reached. The task releases the acquired records.
*/
@Override
public void run() {
// Mark the request as expired prior executing the timeout. There might be concurrent execution
// of timeout task and failed acknowledgement which checks if the timeout task has expired.
// But only one shall update the state to available. The concurrent execution is protected by
// write lock on the state.
hasExpired = true;
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1);
timeoutHandler.handle(memberId, firstOffset, lastOffset);
timeoutHandler.handle(memberId, firstOffset, lastOffset, this);
}
}

View File

@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
/**
* The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
* <p>
* This class is not thread-safe and caller should attain locks if concurrent updates on same batch
* are expected.
*/
public class InFlightBatch {
// The timer is used to schedule the acquisition lock timeout task for the batch.
@ -147,11 +150,10 @@ public class InFlightBatch {
/**
* Archive the batch state. This is used to mark the batch as archived and no further updates
* are allowed to the batch state.
* @param newMemberId The new member id for the records.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public void archiveBatch(String newMemberId) {
inFlightState().archive(newMemberId);
public void archiveBatch() {
inFlightState().archive();
}
/**

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.Uuid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,11 +27,19 @@ import java.util.Objects;
* The InFlightState is used to track the state and delivery count of a record that has been
* fetched from the leader. The state of the record is used to determine if the record should
* be re-deliver or if it can be acknowledged or archived.
* <p>
* This class is not thread-safe and caller should attain locks if concurrent updates on same state
* is expected.
*/
public class InFlightState {
private static final Logger log = LoggerFactory.getLogger(InFlightState.class);
/**
* empty member id used to indicate when a record is not acquired by any member.
*/
private static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
// The state of the fetch batch records.
private RecordState state;
// The number of times the records has been delivered to the client.
@ -41,6 +51,9 @@ public class InFlightState {
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
// The boolean determines if the record has achieved a terminal state of ARCHIVED from which it cannot transition
// to any other state. This could happen because of LSO movement etc.
private boolean isTerminalState = false;
// Visible for testing.
public InFlightState(RecordState state, int deliveryCount, String memberId) {
@ -103,8 +116,10 @@ public class InFlightState {
* and clear the reference to it.
*/
public void cancelAndClearAcquisitionLockTimeoutTask() {
acquisitionLockTimeoutTask.cancel();
acquisitionLockTimeoutTask = null;
if (acquisitionLockTimeoutTask != null) {
acquisitionLockTimeoutTask.cancel();
acquisitionLockTimeoutTask = null;
}
}
/**
@ -115,12 +130,9 @@ public class InFlightState {
* @return true if there is an ongoing state transition, false otherwise.
*/
public boolean hasOngoingStateTransition() {
if (rollbackState == null) {
// This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
// been committed.
return false;
}
return rollbackState.state != null;
// If batch/offset hasn't transitioned even once or the state transitions have been
// committed then rollbackState should always be null.
return rollbackState != null;
}
/**
@ -138,6 +150,17 @@ public class InFlightState {
*/
public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
try {
// If the state transition is in progress, the state should not be updated.
if (hasOngoingStateTransition()) {
// A misbehaving client can send multiple requests to update the same records hence
// do not proceed if the transition is already in progress. Do not log an error here
// as it might not be an error rather concurrent update of same state due to multiple
// requests. This ideally should not happen hence log in info level, if it happens
// frequently then it might be an issue which needs to be investigated.
log.info("{} has ongoing state transition, cannot update to: {}", this, newState);
return null;
}
if (newState == RecordState.AVAILABLE && ops != DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
newState = RecordState.ARCHIVED;
}
@ -149,7 +172,6 @@ public class InFlightState {
return this;
} catch (IllegalStateException e) {
log.error("Failed to update state of the records", e);
rollbackState = null;
return null;
}
}
@ -159,9 +181,11 @@ public class InFlightState {
* cancelling the acquisition lock timeout task.
* This method is used to archive the record when it is no longer needed.
*/
public void archive(String newMemberId) {
public void archive() {
isTerminalState = true;
state = RecordState.ARCHIVED;
memberId = newMemberId;
memberId = EMPTY_MEMBER_ID;
cancelAndClearAcquisitionLockTimeoutTask();
}
/**
@ -178,8 +202,12 @@ public class InFlightState {
* helps update chaining.
*/
public InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
InFlightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
if (updatedState != null) {
rollbackState = currentState;
}
return updatedState;
}
/**
@ -190,13 +218,22 @@ public class InFlightState {
* @param commit If true, commits the state transition, otherwise rolls back.
*/
public void completeStateTransition(boolean commit) {
if (commit) {
if (commit || isTerminalState) {
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
cancelAndClearAcquisitionLockTimeoutTask();
rollbackState = null;
return;
}
state = rollbackState.state;
deliveryCount = rollbackState.deliveryCount;
memberId = rollbackState.memberId;
// Check is acquisition lock timeout task is expired then mark the message as Available.
if (acquisitionLockTimeoutTask != null && acquisitionLockTimeoutTask.hasExpired()) {
state = RecordState.AVAILABLE;
memberId = EMPTY_MEMBER_ID;
cancelAndClearAcquisitionLockTimeoutTask();
} else {
state = rollbackState.state;
memberId = rollbackState.memberId;
}
deliveryCount = rollbackState.deliveryCount();
rollbackState = null;
}