KAFKA-16755: Implement lock timeout functionality in SharePartition (#16414)

Implemented acquisition lock timeout functionality in SharePartition. Implemented the following functions -

1. releaseAcquisitionLockOnTimeout - This function is executed when the acquisition lock timeout is reached. The function releases the acquired records.
2. releaseAcquisitionLockOnTimeoutForCompleteBatch - Function which releases acquired records due to acquisition lock timeout maintained at a batch level.
3. releaseAcquisitionLockOnTimeoutForPerOffsetBatch - Function which releases acquired records due to acquisition lock timeout maintained at an offset level.

Reviewers:  Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>,
This commit is contained in:
Abhinav Dixit 2024-06-25 12:37:28 +05:30 committed by GitHub
parent f4cbf71ea6
commit 3f3b070a6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 873 additions and 1 deletions

View File

@ -1318,7 +1318,119 @@ public class SharePartition {
}
private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) {
// TODO: Implement the logic to release the acquisition lock on timeout.
lock.writeLock().lock();
try {
Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset);
if (floorOffset == null) {
log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition);
return;
}
List<PersisterStateBatch> stateBatches = new ArrayList<>();
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
if (inFlightBatch.offsetState() == null
&& inFlightBatch.batchState() == RecordState.ACQUIRED
&& checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) {
// For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some
// acquired records that need to move to archived state despite their delivery count.
inFlightBatch.maybeInitializeOffsetStateUpdate();
}
// Case when the state of complete batch is valid
if (inFlightBatch.offsetState() == null) {
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId);
} else { // Case when batch has a valid offset state map.
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset);
}
}
if (!stateBatches.isEmpty() && !isWriteShareGroupStateSuccessful(stateBatches)) {
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId {}. " +
"Proceeding with state transition.", groupId, topicIdPartition, memberId);
}
// Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
maybeUpdateCachedStateAndOffsets();
} finally {
lock.writeLock().unlock();
}
}
private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch,
List<PersisterStateBatch> stateBatches,
String memberId) {
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(
inFlightBatch.lastOffset() < startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE,
false,
maxDeliveryCount,
EMPTY_MEMBER_ID);
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for the batch: {}"
+ " for the share partition: {}-{} memberId: {}", inFlightBatch, groupId, topicIdPartition, memberId);
return;
}
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state.id, (short) updateResult.deliveryCount));
// Update acquisition lock timeout task for the batch to null since it is completed now.
updateResult.updateAcquisitionLockTimeoutTask(null);
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
return;
}
log.debug("The batch is not in acquired state while release of acquisition lock on timeout, skipping, batch: {}"
+ " for the share group: {}-{}-{}", inFlightBatch, groupId, memberId, topicIdPartition);
}
private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch,
List<PersisterStateBatch> stateBatches,
String memberId,
long firstOffset,
long lastOffset) {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the request base
// offset i.e. cached batch of 10-14 offsets and request batch of 12-13.
if (offsetState.getKey() < firstOffset) {
continue;
}
if (offsetState.getKey() > lastOffset) {
// No further offsets to process.
break;
}
if (offsetState.getValue().state != RecordState.ACQUIRED) {
log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {}"
+ " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition, memberId);
continue;
}
InFlightState updateResult = offsetState.getValue().tryUpdateState(
offsetState.getKey() < startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE,
false,
maxDeliveryCount,
EMPTY_MEMBER_ID);
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {}"
+ " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition, memberId);
continue;
}
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));
// Update acquisition lock timeout task for the offset to null since it is completed now.
updateResult.updateAcquisitionLockTimeoutTask(null);
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
}
}
// Visible for testing. Should only be used for testing purposes.
@ -1361,6 +1473,11 @@ public class SharePartition {
return stateEpoch;
}
// Visible for testing.
Timer timer() {
return timer;
}
private final class AcquisitionLockTimerTask extends TimerTask {
private final long expirationMs;
private final String memberId;

View File

@ -37,6 +37,7 @@ import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.group.share.PersisterStateBatch;
import org.apache.kafka.server.group.share.ReadShareGroupStateResult;
import org.apache.kafka.server.group.share.TopicData;
import org.apache.kafka.server.group.share.WriteShareGroupStateResult;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
@ -80,6 +81,8 @@ public class SharePartitionTest {
private static Timer mockTimer;
private static final Time MOCK_TIME = new MockTime();
private static final short MAX_IN_FLIGHT_MESSAGES = 200;
private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100;
private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 200;
@BeforeEach
public void setUp() {
@ -1063,6 +1066,739 @@ public class SharePartitionTest {
assertEquals(29, sharePartition.nextFetchOffset());
}
@Test
public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(1),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.timer().size() == 0,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertEquals(1, sharePartition.timer().size());
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0
&& sharePartition.nextFetchOffset() == 10
&& sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE
&& sharePartition.cachedState().get(10L).batchDeliveryCount() == 1
&& sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should be ignored.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(10, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(2, sharePartition.timer().size());
// Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for all the acquired records.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
// Acquire the same batch again.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
// Acquisition lock timeout task should be created on re-acquire action.
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
}
@Test
public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 10, 0, memoryRecords(1, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(0, 0, Collections.singletonList((byte) 2))));
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState());
assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(0L).offsetState());
// Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged.
// Hence, the acquisition lock timeout task would be cancelled already.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 14, Collections.singletonList((byte) 2))));
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
// Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged.
// Hence, the acquisition lock timeout task would be cancelled already.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(5L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10);
// Gap from 15-17 offsets.
recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
MemoryRecords records3 = memoryRecords(2, 1);
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(2, sharePartition.timer().size());
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(3, sharePartition.timer().size());
sharePartition.acknowledge(MEMBER_ID,
// Do not send gap offsets to verify that they are ignored and accepted as per client ack.
Collections.singletonList(new ShareAcknowledgementBatch(5, 18, Collections.singletonList((byte) 1))));
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for batch with starting offset 1.
// Since, other records have been acknowledged.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 1 &&
sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(1L).batchState());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(10L).batchState());
}
@Test
public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(8, 10),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
sharePartition.cachedState().size() == 1 &&
sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
// Acquire subset of records again.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(3, 12),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
// Acquisition lock timeout task should be created only on offsets which have been acquired again.
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
assertEquals(3, sharePartition.timer().size());
// Allowing acquisition lock to expire for the acquired subset batch.
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
return sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
expectedOffsetStateMap.equals(sharePartition.cachedState().get(10L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
}
@Test
public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOffsets() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10);
// Gap from 12-13 offsets.
recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap for 15 offset.
recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap from 17-19 offsets.
recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(2, sharePartition.timer().size());
// Acknowledging over subset of both batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(
6, 18, Arrays.asList(
(byte) 1, (byte) 1, (byte) 1,
(byte) 1, (byte) 1, (byte) 1,
(byte) 0, (byte) 0, (byte) 1,
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(20L).acquisitionLockTimeoutTask());
assertEquals(3, sharePartition.timer().size());
// Allowing acquisition lock to expire for the offsets that have not been acknowledged yet.
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap1 = new HashMap<>();
expectedOffsetStateMap1.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap1.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
Map<Long, InFlightState> expectedOffsetStateMap2 = new HashMap<>();
expectedOffsetStateMap2.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap2.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
return sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
expectedOffsetStateMap1.equals(sharePartition.cachedState().get(5L).offsetState()) &&
expectedOffsetStateMap2.equals(sharePartition.cachedState().get(10L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(20L).acquisitionLockTimeoutTask());
}
@Test
public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records
.build();
// Adding memoryRecords(10, 0) in the sharePartition to make sure that SPSO doesn't move forward when delivery count of records2
// exceed the max delivery count.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 10),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(2, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(10L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 10),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());
assertEquals(2, sharePartition.cachedState().get(10L).batchDeliveryCount());
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire to archive the records that reach max delivery count.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
// After the second delivery attempt fails to acknowledge the record correctly, the record should be archived.
sharePartition.cachedState().get(10L).batchState() == RecordState.ARCHIVED &&
sharePartition.cachedState().get(10L).batchDeliveryCount() == 2 &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records
.build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(5, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(3L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(4L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(7L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(8L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask());
// Allowing acquisition lock to expire to archive the records that reach max delivery count.
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(1L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(2L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(9L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
return sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 &&
expectedOffsetStateMap.equals(sharePartition.cachedState().get(0L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
assertNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(3L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(4L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(7L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(8L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask());
// Since only first 5 records from the batch are archived, the batch remains in the cachedState, but the
// start offset is updated
assertEquals(5, sharePartition.startOffset());
}
@Test
public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records
.build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire to archive the records that reach max delivery count.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
// After the second failed attempt to acknowledge the record batch successfully, the record batch is archived.
// Since this is the first batch in the share partition, SPSO moves forward and the cachedState is cleared
sharePartition.cachedState().isEmpty() &&
sharePartition.nextFetchOffset() == 10,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
// Acknowledge with ACCEPT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout.
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
// Try acknowledging with REJECT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout.
ackResult = sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 3))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
}
@Test
public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Acknowledge with REJECT type.
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 6, Collections.singletonList((byte) 2))));
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask());
assertEquals(3, sharePartition.timer().size());
// Acknowledge with ACCEPT type.
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(8, 9, Collections.singletonList((byte) 1))));
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire will only affect the offsets that have not been acknowledged yet.
TestUtils.waitForCondition(
() -> {
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
return sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 &&
expectedOffsetStateMap.equals(sharePartition.cachedState().get(5L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask());
}
@Test
public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister)
.withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.build();
// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertEquals(1, sharePartition.timer().size());
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens.
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
sharePartition.cachedState().size() == 1 &&
sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
}
@Test
public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister)
.withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.build();
// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns true for acknowledge to pass.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(6, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertEquals(1, sharePartition.timer().size());
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(8, 9, Collections.singletonList((byte) 1))));
// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
// Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens.
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
return sharePartition.timer().size() == 0 && sharePartition.cachedState().size() == 1 &&
expectedOffsetStateMap.equals(sharePartition.cachedState().get(5L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Acquisition lock never got released.");
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(7L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(8L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(9L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(10L).acquisitionLockTimeoutTask());
}
private MemoryRecords memoryRecords(int numOfRecords) {
return memoryRecords(numOfRecords, 0);
}
@ -1100,6 +1836,15 @@ public class SharePartitionTest {
return acquiredRecordsList;
}
public void mockPersisterReadStateMethod(Persister persister) {
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionAllData(0, 0, 0L, Errors.NONE.code(), Errors.NONE.message(),
Collections.emptyList())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
}
private static class SharePartitionBuilder {
private int acquisitionLockTimeoutMs = 30000;
@ -1117,6 +1862,16 @@ public class SharePartitionTest {
return this;
}
private SharePartitionBuilder withAcquisitionLockTimeoutMs(int acquisitionLockTimeoutMs) {
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
return this;
}
private SharePartitionBuilder withMaxDeliveryCount(int maxDeliveryCount) {
this.maxDeliveryCount = maxDeliveryCount;
return this;
}
public static SharePartitionBuilder builder() {
return new SharePartitionBuilder();
}