diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index dbd7e5e1730..db9b862839c 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -241,6 +241,12 @@ public class SharePartition { */ private final AcquisitionLockTimeoutHandler timeoutHandler; + /** + * The replica manager is used to check to see if any delayed share fetch request can be completed because of data + * availability due to acquisition lock timeout. + */ + private final ReplicaManager replicaManager; + /** * The share partition start offset specifies the partition start offset from which the records * are cached in the cachedState of the sharePartition. @@ -295,12 +301,6 @@ public class SharePartition { */ private long fetchLockIdleDurationMs; - /** - * The replica manager is used to check to see if any delayed share fetch request can be completed because of data - * availability due to acquisition lock timeout. - */ - private final ReplicaManager replicaManager; - SharePartition( String groupId, TopicIdPartition topicIdPartition, @@ -1245,10 +1245,7 @@ public class SharePartition { continue; } - offsetState.getValue().archive(EMPTY_MEMBER_ID); - if (initialState == RecordState.ACQUIRED) { - offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask(); - } + offsetState.getValue().archive(); isAnyOffsetArchived = true; } return isAnyOffsetArchived; @@ -1263,10 +1260,7 @@ public class SharePartition { log.trace("Archiving complete batch: {} for the share partition: {}-{}", inFlightBatch, groupId, topicIdPartition); if (inFlightBatch.batchState() == initialState) { // Change the state of complete batch since the same state exists for the entire inFlight batch. - inFlightBatch.archiveBatch(EMPTY_MEMBER_ID); - if (initialState == RecordState.ACQUIRED) { - inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask(); - } + inFlightBatch.archiveBatch(); return true; } } finally { @@ -1799,6 +1793,12 @@ public class SharePartition { if (throwable.isPresent()) { return throwable; } + + if (inFlightBatch.batchHasOngoingStateTransition()) { + log.debug("The batch has on-going transition, batch: {} for the share " + + "partition: {}-{}", inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException("The record state is invalid. The acknowledgement of delivery could not be completed.")); + } } // Determine if the in-flight batch is a full match from the request batch. @@ -1899,7 +1899,15 @@ public class SharePartition { + " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId, topicIdPartition); return Optional.of(new InvalidRecordStateException( - "The batch cannot be acknowledged. The offset is not acquired.")); + "The offset cannot be acknowledged. The offset is not acquired.")); + } + + if (offsetState.getValue().hasOngoingStateTransition()) { + log.debug("The offset has on-going transition, offset: {} batch: {} for the share" + + " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId, + topicIdPartition); + return Optional.of(new InvalidRecordStateException( + "The record state is invalid. The acknowledgement of delivery could not be completed.")); } // Check if member id is the owner of the offset. @@ -2044,7 +2052,12 @@ public class SharePartition { // Log in DEBUG to avoid flooding of logs for a faulty client. log.debug("Request failed for updating state, rollback any changed state" + " for the share partition: {}-{}", groupId, topicIdPartition); - updatedStates.forEach(state -> state.completeStateTransition(false)); + updatedStates.forEach(state -> { + state.completeStateTransition(false); + if (state.state() == RecordState.AVAILABLE) { + updateFindNextFetchOffset(true); + } + }); future.completeExceptionally(throwable); return; } @@ -2067,7 +2080,14 @@ public class SharePartition { if (exception != null) { log.debug("Failed to write state to persister for the share partition: {}-{}", groupId, topicIdPartition, exception); - updatedStates.forEach(state -> state.completeStateTransition(false)); + // In case of failure when transition state is rolled back then it should be rolled + // back to ACQUIRED state, unless acquisition lock for the state has expired. + updatedStates.forEach(state -> { + state.completeStateTransition(false); + if (state.state() == RecordState.AVAILABLE) { + updateFindNextFetchOffset(true); + } + }); future.completeExceptionally(exception); return; } @@ -2076,8 +2096,6 @@ public class SharePartition { groupId, topicIdPartition); updatedStates.forEach(state -> { state.completeStateTransition(true); - // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. - state.cancelAndClearAcquisitionLockTimeoutTask(); if (state.state() == RecordState.AVAILABLE) { updateFindNextFetchOffset(true); } @@ -2389,10 +2407,18 @@ public class SharePartition { } private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() { - return (memberId, firstOffset, lastOffset) -> { + return (memberId, firstOffset, lastOffset, timerTask) -> { List stateBatches; lock.writeLock().lock(); try { + // Check if timer task is already cancelled. This can happen when concurrent requests + // happen to acknowledge in-flight state and timeout handler is waiting for the lock + // but already cancelled. + if (timerTask.isCancelled()) { + log.debug("Timer task is already cancelled, not executing further."); + return; + } + Map.Entry floorOffset = cachedState.floorEntry(firstOffset); if (floorOffset == null) { log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition); diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c17ce391b37..1289d720054 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; +import org.apache.kafka.server.share.fetch.DeliveryCountOps; import org.apache.kafka.server.share.fetch.InFlightState; import org.apache.kafka.server.share.fetch.RecordState; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; @@ -71,6 +72,7 @@ import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.test.TestUtils; @@ -88,6 +90,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -7573,6 +7576,308 @@ public class SharePartitionTest { assertEquals(20, sharePartition.nextFetchOffset()); } + @Test + public void testAcquisitionLockTimeoutWithConcurrentAcknowledgement() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withPersister(persister) + .build(); + + // Create 2 batches of records. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 0).close(); + memoryRecordsBuilder(buffer, 15, 5).close(); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Acquire 10 records. + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + 5, /* Batch size of 5 so cache can have 2 entries */ + 10, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records, 0), + FETCH_ISOLATION_HWM), + 20); + + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(2, sharePartition.timer().size()); + + // Return 2 future which will be completed later. + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Store the corresponding batch timer tasks. + TimerTask timerTask1 = sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask(); + TimerTask timerTask2 = sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask(); + + // Acknowledge 1 offset in first batch as Accept to create offset tracking, accept complete + // sencond batch. And mark offset 0 as release so cached state do not move ahead. + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(1, 1, List.of(AcknowledgeType.ACCEPT.id)), + new ShareAcknowledgementBatch(5, 19, List.of(AcknowledgeType.ACCEPT.id)))); + + // Assert the start offset has not moved. + assertEquals(0L, sharePartition.startOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(0L).state()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState()); + // Verify ongoing transition states. + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(0L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(0L).offsetState().get(1L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(0L).offsetState().get(2L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(5L).batchHasOngoingStateTransition()); + + // Validate first timer task is already cancelled. + assertTrue(timerTask1.isCancelled()); + assertFalse(timerTask2.isCancelled()); + + // Fetch offset state timer tasks. + TimerTask timerTaskOffsetState1 = sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask(); + TimerTask timerTaskOffsetState2 = sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask(); + TimerTask timerTaskOffsetState3 = sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask(); + + // Complete futures. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future1.complete(writeShareGroupStateResult); + future2.complete(writeShareGroupStateResult); + + // Verify timer tasks are now cancelled, except unacknowledged offsets. + assertEquals(2, sharePartition.cachedState().size()); + assertTrue(timerTask2.isCancelled()); + assertTrue(timerTaskOffsetState1.isCancelled()); + assertTrue(timerTaskOffsetState2.isCancelled()); + assertFalse(timerTaskOffsetState3.isCancelled()); + + // Verify the state prior executing the timer tasks. + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState()); + + // Running expired timer tasks should not mark offsets available, except for offset 2. + timerTask1.run(); + // State should remain same. + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state()); + + timerTask2.run(); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState()); + + timerTaskOffsetState2.run(); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).offsetState().get(1L).state()); + + // Should update the state to available as the timer task is not yet expired. + timerTaskOffsetState3.run(); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(2L).state()); + } + + @Test + public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for + // offsets 2-6 and 7-11 respectively. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id)))); + + // Validate that there is no ongoing transition. + assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // Move LSO to 7, so some records/offsets can be marked archived for the first batch. + sharePartition.updateCacheAndOffsets(7L); + + // Start offset will be moved. + assertEquals(12L, sharePartition.nextFetchOffset()); + assertEquals(7L, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).batchState()); + + // Complete future1 exceptionally so acknowledgement for 2-6 offsets will be completed. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + future1.complete(writeShareGroupStateResult); + + // The completion of future1 with exception should not impact the cached state since those records have already + // been archived. + assertEquals(12, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).batchState()); + + future2.complete(writeShareGroupStateResult); + assertEquals(12L, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(7L).batchState()); + } + + @Test + public void inFlightStateRollbackAndArchiveStateTransition() throws InterruptedException { + InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID); + + inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID); + assertTrue(inFlightState.hasOngoingStateTransition()); + + // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same + // time when we have a call to completeStateTransition with false commit value, we get a call to ARCHIVE the record. + // No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED. + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + List> callables = List.of( + () -> { + inFlightState.archive(); + return null; + }, + () -> { + inFlightState.completeStateTransition(false); + return null; + } + ); + executorService.invokeAll(callables); + } finally { + if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS)) + executorService.shutdown(); + } + assertEquals(RecordState.ARCHIVED, inFlightState.state()); + assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId()); + } + + @Test + public void inFlightStateCommitSuccessAndArchiveStateTransition() throws InterruptedException { + InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID); + + inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID); + assertTrue(inFlightState.hasOngoingStateTransition()); + + // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same + // time when we have a call to completeStateTransition with true commit value, we get a call to ARCHIVE the record. + // No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED. + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + List> callables = List.of( + () -> { + inFlightState.archive(); + return null; + }, + () -> { + inFlightState.completeStateTransition(true); + return null; + } + ); + executorService.invokeAll(callables); + } finally { + if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS)) + executorService.shutdown(); + } + assertEquals(RecordState.ARCHIVED, inFlightState.state()); + assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId()); + } + + @Test + public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withPersister(persister) + .build(); + + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, + fetchPartitionData(memoryRecords(2, 0)), FETCH_ISOLATION_HWM + ), 2 + ); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState()); + + // Return a future which will be completed later, so the batch state has ongoing transition. + CompletableFuture future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 1, List.of(AcknowledgeType.ACCEPT.id)))); + // Assert the start offset has not moved and batch has ongoing transition. + assertEquals(0L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(0L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId()); + // Timer task has not been expired yet. + assertFalse(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired()); + + // Allowing acquisition lock to expire. This will not cause any change because the record is not in ACQUIRED state. + // This will remove the entry of the timer task from timer. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); + TestUtils.waitForCondition( + () -> sharePartition.cachedState().get(0L).batchState() == RecordState.ACKNOWLEDGED && + sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && + sharePartition.timer().size() == 0, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of()))); + + // Acquisition lock timeout task has run already and is not null. + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + // Timer task should be expired now. + assertTrue(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired()); + + // Complete future exceptionally so acknowledgement for 0-1 offsets will be completed. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + future.complete(writeShareGroupStateResult); + + // Even though write state RPC has failed and corresponding acquisition lock timeout task has expired, + // the record should not stuck in ACQUIRED state with no acquisition lock timeout task. + assertEquals(1, sharePartition.cachedState().size()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId()); + assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + } + /** * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT). */ diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java index c83d7e537da..b5480f2e54d 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.server.share.fetch; +import org.apache.kafka.server.util.timer.TimerTask; + /** * AcquisitionLockTimeoutHandler is an interface that defines a handler for acquisition lock timeouts. * It is used to handle cases where the acquisition lock for a share partition times out. @@ -29,6 +31,6 @@ public interface AcquisitionLockTimeoutHandler { * @param firstOffset the first offset * @param lastOffset the last offset */ - void handle(String memberId, long firstOffset, long lastOffset); + void handle(String memberId, long firstOffset, long lastOffset, TimerTask timerTask); } diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java index 6796d24d374..2766412fa6e 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java @@ -32,6 +32,7 @@ public class AcquisitionLockTimerTask extends TimerTask { private final long lastOffset; private final AcquisitionLockTimeoutHandler timeoutHandler; private final SharePartitionMetrics sharePartitionMetrics; + private volatile boolean hasExpired; public AcquisitionLockTimerTask( Time time, @@ -49,18 +50,28 @@ public class AcquisitionLockTimerTask extends TimerTask { this.lastOffset = lastOffset; this.timeoutHandler = timeoutHandler; this.sharePartitionMetrics = sharePartitionMetrics; + this.hasExpired = false; } public long expirationMs() { return expirationMs; } + public boolean hasExpired() { + return hasExpired; + } + /** * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. */ @Override public void run() { + // Mark the request as expired prior executing the timeout. There might be concurrent execution + // of timeout task and failed acknowledgement which checks if the timeout task has expired. + // But only one shall update the state to available. The concurrent execution is protected by + // write lock on the state. + hasExpired = true; sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1); - timeoutHandler.handle(memberId, firstOffset, lastOffset); + timeoutHandler.handle(memberId, firstOffset, lastOffset, this); } } diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java index c3e2d353328..df40b943d5a 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java @@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentSkipListMap; /** * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + *

+ * This class is not thread-safe and caller should attain locks if concurrent updates on same batch + * are expected. */ public class InFlightBatch { // The timer is used to schedule the acquisition lock timeout task for the batch. @@ -147,11 +150,10 @@ public class InFlightBatch { /** * Archive the batch state. This is used to mark the batch as archived and no further updates * are allowed to the batch state. - * @param newMemberId The new member id for the records. * @throws IllegalStateException if the offset state is maintained and the batch state is not available. */ - public void archiveBatch(String newMemberId) { - inFlightState().archive(newMemberId); + public void archiveBatch() { + inFlightState().archive(); } /** diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java index 97b2d55ed0b..d5831d74853 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.server.share.fetch; +import org.apache.kafka.common.Uuid; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,11 +27,19 @@ import java.util.Objects; * The InFlightState is used to track the state and delivery count of a record that has been * fetched from the leader. The state of the record is used to determine if the record should * be re-deliver or if it can be acknowledged or archived. + *

+ * This class is not thread-safe and caller should attain locks if concurrent updates on same state + * is expected. */ public class InFlightState { private static final Logger log = LoggerFactory.getLogger(InFlightState.class); + /** + * empty member id used to indicate when a record is not acquired by any member. + */ + private static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString(); + // The state of the fetch batch records. private RecordState state; // The number of times the records has been delivered to the client. @@ -41,6 +51,9 @@ public class InFlightState { private InFlightState rollbackState; // The timer task for the acquisition lock timeout. private AcquisitionLockTimerTask acquisitionLockTimeoutTask; + // The boolean determines if the record has achieved a terminal state of ARCHIVED from which it cannot transition + // to any other state. This could happen because of LSO movement etc. + private boolean isTerminalState = false; // Visible for testing. public InFlightState(RecordState state, int deliveryCount, String memberId) { @@ -103,8 +116,10 @@ public class InFlightState { * and clear the reference to it. */ public void cancelAndClearAcquisitionLockTimeoutTask() { - acquisitionLockTimeoutTask.cancel(); - acquisitionLockTimeoutTask = null; + if (acquisitionLockTimeoutTask != null) { + acquisitionLockTimeoutTask.cancel(); + acquisitionLockTimeoutTask = null; + } } /** @@ -115,12 +130,9 @@ public class InFlightState { * @return true if there is an ongoing state transition, false otherwise. */ public boolean hasOngoingStateTransition() { - if (rollbackState == null) { - // This case could occur when the batch/offset hasn't transitioned even once or the state transitions have - // been committed. - return false; - } - return rollbackState.state != null; + // If batch/offset hasn't transitioned even once or the state transitions have been + // committed then rollbackState should always be null. + return rollbackState != null; } /** @@ -138,6 +150,17 @@ public class InFlightState { */ public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { try { + // If the state transition is in progress, the state should not be updated. + if (hasOngoingStateTransition()) { + // A misbehaving client can send multiple requests to update the same records hence + // do not proceed if the transition is already in progress. Do not log an error here + // as it might not be an error rather concurrent update of same state due to multiple + // requests. This ideally should not happen hence log in info level, if it happens + // frequently then it might be an issue which needs to be investigated. + log.info("{} has ongoing state transition, cannot update to: {}", this, newState); + return null; + } + if (newState == RecordState.AVAILABLE && ops != DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) { newState = RecordState.ARCHIVED; } @@ -149,7 +172,6 @@ public class InFlightState { return this; } catch (IllegalStateException e) { log.error("Failed to update state of the records", e); - rollbackState = null; return null; } } @@ -159,9 +181,11 @@ public class InFlightState { * cancelling the acquisition lock timeout task. * This method is used to archive the record when it is no longer needed. */ - public void archive(String newMemberId) { + public void archive() { + isTerminalState = true; state = RecordState.ARCHIVED; - memberId = newMemberId; + memberId = EMPTY_MEMBER_ID; + cancelAndClearAcquisitionLockTimeoutTask(); } /** @@ -178,8 +202,12 @@ public class InFlightState { * helps update chaining. */ public InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { - rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); - return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); + InFlightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + if (updatedState != null) { + rollbackState = currentState; + } + return updatedState; } /** @@ -190,13 +218,22 @@ public class InFlightState { * @param commit If true, commits the state transition, otherwise rolls back. */ public void completeStateTransition(boolean commit) { - if (commit) { + if (commit || isTerminalState) { + // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. + cancelAndClearAcquisitionLockTimeoutTask(); rollbackState = null; return; } - state = rollbackState.state; - deliveryCount = rollbackState.deliveryCount; - memberId = rollbackState.memberId; + // Check is acquisition lock timeout task is expired then mark the message as Available. + if (acquisitionLockTimeoutTask != null && acquisitionLockTimeoutTask.hasExpired()) { + state = RecordState.AVAILABLE; + memberId = EMPTY_MEMBER_ID; + cancelAndClearAcquisitionLockTimeoutTask(); + } else { + state = rollbackState.state; + memberId = rollbackState.memberId; + } + deliveryCount = rollbackState.deliveryCount(); rollbackState = null; }