From 3f3b070a6a239dc801d4dd83c8ee55ccf497346b Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Tue, 25 Jun 2024 12:37:28 +0530 Subject: [PATCH] KAFKA-16755: Implement lock timeout functionality in SharePartition (#16414) Implemented acquisition lock timeout functionality in SharePartition. Implemented the following functions - 1. releaseAcquisitionLockOnTimeout - This function is executed when the acquisition lock timeout is reached. The function releases the acquired records. 2. releaseAcquisitionLockOnTimeoutForCompleteBatch - Function which releases acquired records due to acquisition lock timeout maintained at a batch level. 3. releaseAcquisitionLockOnTimeoutForPerOffsetBatch - Function which releases acquired records due to acquisition lock timeout maintained at an offset level. Reviewers: Andrew Schofield , Apoorv Mittal , Manikumar Reddy , --- .../kafka/server/share/SharePartition.java | 119 ++- .../server/share/SharePartitionTest.java | 755 ++++++++++++++++++ 2 files changed, 873 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 024f65fb704..3948fbf676d 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1318,7 +1318,119 @@ public class SharePartition { } private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) { - // TODO: Implement the logic to release the acquisition lock on timeout. + lock.writeLock().lock(); + try { + Map.Entry floorOffset = cachedState.floorEntry(firstOffset); + if (floorOffset == null) { + log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition); + return; + } + List stateBatches = new ArrayList<>(); + NavigableMap subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true); + for (Map.Entry entry : subMap.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); + + if (inFlightBatch.offsetState() == null + && inFlightBatch.batchState() == RecordState.ACQUIRED + && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) { + + // For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some + // acquired records that need to move to archived state despite their delivery count. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + } + + // Case when the state of complete batch is valid + if (inFlightBatch.offsetState() == null) { + releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId); + } else { // Case when batch has a valid offset state map. + releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset); + } + } + + if (!stateBatches.isEmpty() && !isWriteShareGroupStateSuccessful(stateBatches)) { + + // Even if write share group state RPC call fails, we will still go ahead with the state transition. + log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId {}. " + + "Proceeding with state transition.", groupId, topicIdPartition, memberId); + } + + // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. + maybeUpdateCachedStateAndOffsets(); + } finally { + lock.writeLock().unlock(); + } + } + + private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch, + List stateBatches, + String memberId) { + if (inFlightBatch.batchState() == RecordState.ACQUIRED) { + InFlightState updateResult = inFlightBatch.tryUpdateBatchState( + inFlightBatch.lastOffset() < startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE, + false, + maxDeliveryCount, + EMPTY_MEMBER_ID); + if (updateResult == null) { + log.error("Unable to release acquisition lock on timeout for the batch: {}" + + " for the share partition: {}-{} memberId: {}", inFlightBatch, groupId, topicIdPartition, memberId); + return; + } + stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), + updateResult.state.id, (short) updateResult.deliveryCount)); + + // Update acquisition lock timeout task for the batch to null since it is completed now. + updateResult.updateAcquisitionLockTimeoutTask(null); + if (updateResult.state != RecordState.ARCHIVED) { + findNextFetchOffset.set(true); + } + return; + } + log.debug("The batch is not in acquired state while release of acquisition lock on timeout, skipping, batch: {}" + + " for the share group: {}-{}-{}", inFlightBatch, groupId, memberId, topicIdPartition); + } + + private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch, + List stateBatches, + String memberId, + long firstOffset, + long lastOffset) { + for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) { + + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < firstOffset) { + continue; + } + if (offsetState.getKey() > lastOffset) { + // No further offsets to process. + break; + } + if (offsetState.getValue().state != RecordState.ACQUIRED) { + log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {}" + + " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition, memberId); + continue; + } + InFlightState updateResult = offsetState.getValue().tryUpdateState( + offsetState.getKey() < startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE, + false, + maxDeliveryCount, + EMPTY_MEMBER_ID); + if (updateResult == null) { + log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {}" + + " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition, memberId); + continue; + } + stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), + updateResult.state.id, (short) updateResult.deliveryCount)); + + // Update acquisition lock timeout task for the offset to null since it is completed now. + updateResult.updateAcquisitionLockTimeoutTask(null); + if (updateResult.state != RecordState.ARCHIVED) { + findNextFetchOffset.set(true); + } + } } // Visible for testing. Should only be used for testing purposes. @@ -1361,6 +1473,11 @@ public class SharePartition { return stateEpoch; } + // Visible for testing. + Timer timer() { + return timer; + } + private final class AcquisitionLockTimerTask extends TimerTask { private final long expirationMs; private final String memberId; diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 874b08d7dc9..12a9456e5b8 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.server.group.share.Persister; import org.apache.kafka.server.group.share.PersisterStateBatch; import org.apache.kafka.server.group.share.ReadShareGroupStateResult; import org.apache.kafka.server.group.share.TopicData; +import org.apache.kafka.server.group.share.WriteShareGroupStateResult; import org.apache.kafka.server.share.ShareAcknowledgementBatch; import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.timer.SystemTimer; @@ -80,6 +81,8 @@ public class SharePartitionTest { private static Timer mockTimer; private static final Time MOCK_TIME = new MockTime(); private static final short MAX_IN_FLIGHT_MESSAGES = 200; + private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100; + private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 200; @BeforeEach public void setUp() { @@ -1063,6 +1066,739 @@ public class SharePartitionTest { assertEquals(29, sharePartition.nextFetchOffset()); } + @Test + public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(1), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.nextFetchOffset() == 0 && + sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && + sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null && + sharePartition.timer().size() == 0, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertEquals(1, sharePartition.timer().size()); + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 + && sharePartition.nextFetchOffset() == 10 + && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE + && sharePartition.cachedState().get(10L).batchDeliveryCount() == 1 + && sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should be ignored. + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(10, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(2, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for all the acquired records. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 0 && + sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null && + sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 10 && + sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + // Acquire the same batch again. + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + // Acquisition lock timeout task should be created on re-acquire action. + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + } + + @Test + public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 10, 0, memoryRecords(1, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(0, 0, Collections.singletonList((byte) 2)))); + + assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(0, sharePartition.timer().size()); + + assertEquals(0, sharePartition.nextFetchOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState()); + assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(0L).offsetState()); + + // Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged. + // Hence, the acquisition lock timeout task would be cancelled already. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 0 && + sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && + sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 14, Collections.singletonList((byte) 2)))); + + assertEquals(5, sharePartition.nextFetchOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState()); + assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(0, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged. + // Hence, the acquisition lock timeout task would be cancelled already. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 5 && + sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(5L).batchDeliveryCount() == 1 && + sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + MemoryRecords records1 = memoryRecords(2, 5); + // Untracked gap of 3 offsets from 7-9. + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); + // Gap from 15-17 offsets. + recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + MemoryRecords records2 = recordsBuilder.build(); + MemoryRecords records3 = memoryRecords(2, 1); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(2, sharePartition.timer().size()); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(3, sharePartition.timer().size()); + + sharePartition.acknowledge(MEMBER_ID, + // Do not send gap offsets to verify that they are ignored and accepted as per client ack. + Collections.singletonList(new ShareAcknowledgementBatch(5, 18, Collections.singletonList((byte) 1)))); + + assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for batch with starting offset 1. + // Since, other records have been acknowledged. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 1 && + sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask() == null && + sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null && + sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(1L).batchState()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(10L).batchState()); + } + + @Test + public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(8, 10), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 10 && + sharePartition.cachedState().size() == 1 && + sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + // Acquire subset of records again. + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(3, 12), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + // Acquisition lock timeout task should be created only on offsets which have been acquired again. + assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask()); + assertEquals(3, sharePartition.timer().size()); + + // Allowing acquisition lock to expire for the acquired subset batch. + TestUtils.waitForCondition( + () -> { + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(15L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(16L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(17L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + + return sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 10 && + expectedOffsetStateMap.equals(sharePartition.cachedState().get(10L).offsetState()); + }, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask()); + } + + @Test + public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOffsets() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + MemoryRecords records1 = memoryRecords(2, 5); + // Untracked gap of 3 offsets from 7-9. + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + // Gap from 12-13 offsets. + recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + // Gap for 15 offset. + recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + // Gap from 17-19 offsets. + recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + MemoryRecords records2 = recordsBuilder.build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(2, sharePartition.timer().size()); + + // Acknowledging over subset of both batch with subset of gap offsets. + sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch( + 6, 18, Arrays.asList( + (byte) 1, (byte) 1, (byte) 1, + (byte) 1, (byte) 1, (byte) 1, + (byte) 0, (byte) 0, (byte) 1, + (byte) 0, (byte) 1, (byte) 0, + (byte) 1)))); + + assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); + + assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(20L).acquisitionLockTimeoutTask()); + assertEquals(3, sharePartition.timer().size()); + + // Allowing acquisition lock to expire for the offsets that have not been acknowledged yet. + TestUtils.waitForCondition( + () -> { + Map expectedOffsetStateMap1 = new HashMap<>(); + expectedOffsetStateMap1.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap1.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + + Map expectedOffsetStateMap2 = new HashMap<>(); + expectedOffsetStateMap2.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap2.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + + return sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 5 && + expectedOffsetStateMap1.equals(sharePartition.cachedState().get(5L).offsetState()) && + expectedOffsetStateMap2.equals(sharePartition.cachedState().get(10L).offsetState()); + }, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); + + assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(20L).acquisitionLockTimeoutTask()); + } + + @Test + public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records + .build(); + + // Adding memoryRecords(10, 0) in the sharePartition to make sure that SPSO doesn't move forward when delivery count of records2 + // exceed the max delivery count. + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 10), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(2, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 0 && + sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(10L).batchDeliveryCount() == 1 && + sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 10), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(2, sharePartition.cachedState().get(10L).batchDeliveryCount()); + assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire to archive the records that reach max delivery count. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 0 && + // After the second delivery attempt fails to acknowledge the record correctly, the record should be archived. + sharePartition.cachedState().get(10L).batchState() == RecordState.ARCHIVED && + sharePartition.cachedState().get(10L).batchDeliveryCount() == 2 && + sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records + .build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 0 && + sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && + sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(5, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(3L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(4L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(6L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(7L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(8L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask()); + + // Allowing acquisition lock to expire to archive the records that reach max delivery count. + TestUtils.waitForCondition( + () -> { + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(1L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(2L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(8L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(9L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + + return sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && + expectedOffsetStateMap.equals(sharePartition.cachedState().get(0L).offsetState()); + }, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + assertNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(3L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(4L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(6L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(7L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(8L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask()); + + // Since only first 5 records from the batch are archived, the batch remains in the cachedState, but the + // start offset is updated + assertEquals(5, sharePartition.startOffset()); + } + + @Test + public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records + .build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 0 && + sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire to archive the records that reach max delivery count. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + // After the second failed attempt to acknowledge the record batch successfully, the record batch is archived. + // Since this is the first batch in the share partition, SPSO moves forward and the cachedState is cleared + sharePartition.cachedState().isEmpty() && + sharePartition.nextFetchOffset() == 10, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 5 && + sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + // Acknowledge with ACCEPT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout. + CompletableFuture> ackResult = sharePartition.acknowledge(MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(0, sharePartition.timer().size()); + + // Try acknowledging with REJECT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout. + ackResult = sharePartition.acknowledge(MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 3)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(0, sharePartition.timer().size()); + } + + @Test + public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), + Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Acknowledge with REJECT type. + sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 6, Collections.singletonList((byte) 2)))); + + assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask()); + assertEquals(3, sharePartition.timer().size()); + + // Acknowledge with ACCEPT type. + sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(8, 9, Collections.singletonList((byte) 1)))); + + assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire will only affect the offsets that have not been acknowledged yet. + TestUtils.waitForCondition( + () -> { + // Check cached state. + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + + return sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && + expectedOffsetStateMap.equals(sharePartition.cachedState().get(5L).offsetState()); + }, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask()); + } + + @Test + public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + mockPersisterReadStateMethod(persister); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister) + .withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .build(); + + // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5), + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + + assertEquals(1, sharePartition.timer().size()); + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + + // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. + TestUtils.waitForCondition( + () -> sharePartition.timer().size() == 0 && + sharePartition.nextFetchOffset() == 5 && + sharePartition.cachedState().size() == 1 && + sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && + sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + } + + @Test + public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + mockPersisterReadStateMethod(persister); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister) + .withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .build(); + + // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns true for acknowledge to pass. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(6, 5), + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + + assertEquals(1, sharePartition.timer().size()); + assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); + + sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(8, 9, Collections.singletonList((byte) 1)))); + + // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. + TestUtils.waitForCondition( + () -> { + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + return sharePartition.timer().size() == 0 && sharePartition.cachedState().size() == 1 && + expectedOffsetStateMap.equals(sharePartition.cachedState().get(5L).offsetState()); + }, + DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, + () -> "Acquisition lock never got released."); + + assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(5L).offsetState().get(10L).acquisitionLockTimeoutTask()); + } + private MemoryRecords memoryRecords(int numOfRecords) { return memoryRecords(numOfRecords, 0); } @@ -1100,6 +1836,15 @@ public class SharePartitionTest { return acquiredRecordsList; } + public void mockPersisterReadStateMethod(Persister persister) { + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 0, 0L, Errors.NONE.code(), Errors.NONE.message(), + Collections.emptyList()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + } + private static class SharePartitionBuilder { private int acquisitionLockTimeoutMs = 30000; @@ -1117,6 +1862,16 @@ public class SharePartitionTest { return this; } + private SharePartitionBuilder withAcquisitionLockTimeoutMs(int acquisitionLockTimeoutMs) { + this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs; + return this; + } + + private SharePartitionBuilder withMaxDeliveryCount(int maxDeliveryCount) { + this.maxDeliveryCount = maxDeliveryCount; + return this; + } + public static SharePartitionBuilder builder() { return new SharePartitionBuilder(); }