KAFKA-16754: Implemented release acquired records functionality to SharePartition (#16430)

About
Implemented release acquired records functionality in SharePartition. This functionality is used when a share session gets closed, hence all the acquired records should either move to AVAILABLE or ARCHIVED state. Implemented the following functions -

1. releaseAcquiredRecords - This function is executed when the acquisition lock timeout is reached. The function releases the acquired records.
2. releaseAcquiredRecordsForCompleteBatch - Function which releases acquired records maintained at a batch level.
3. releaseAcquiredRecordsForPerOffsetBatch - Function which releases acquired records maintained at an offset level.

Testing
Added unit tests to cover the new functionality added.


Reviewers:  Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Abhinav Dixit 2024-06-27 15:33:46 +05:30 committed by GitHub
parent 9b4f13efbc
commit 49e9bd4a5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 801 additions and 4 deletions

View File

@ -547,12 +547,139 @@ public class SharePartition {
* @return A future which is completed when the records are released. * @return A future which is completed when the records are released.
*/ */
public CompletableFuture<Optional<Throwable>> releaseAcquiredRecords(String memberId) { public CompletableFuture<Optional<Throwable>> releaseAcquiredRecords(String memberId) {
log.trace("Release acquired records request for share partition: {}-{}-{}", groupId, memberId, topicIdPartition); log.trace("Release acquired records request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId);
CompletableFuture<Optional<Throwable>> future = new CompletableFuture<>(); Throwable throwable = null;
future.completeExceptionally(new UnsupportedOperationException("Not implemented")); lock.writeLock().lock();
List<InFlightState> updatedStates = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
return future; try {
RecordState recordState = RecordState.AVAILABLE;
// Iterate over multiple fetched batches. The state can vary per offset entry
for (Map.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
if (inFlightBatch.offsetState() == null
&& inFlightBatch.batchState() == RecordState.ACQUIRED
&& inFlightBatch.batchMemberId().equals(memberId)
&& checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) {
// For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some
// acquired records that need to move to archived state despite their delivery count.
inFlightBatch.maybeInitializeOffsetStateUpdate();
}
if (inFlightBatch.offsetState() != null) {
Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches);
if (releaseAcquiredRecordsThrowable.isPresent()) {
throwable = releaseAcquiredRecordsThrowable.get();
break;
}
continue;
}
Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches);
if (releaseAcquiredRecordsThrowable.isPresent()) {
throwable = releaseAcquiredRecordsThrowable.get();
break;
}
}
// If the release acquired records is successful then persist state, complete the state transition
// and update the cached state for start offset. Else rollback the state transition.
rollbackOrProcessStateUpdates(throwable, updatedStates, stateBatches);
} finally {
lock.writeLock().unlock();
}
return CompletableFuture.completedFuture(Optional.ofNullable(throwable));
}
private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String memberId,
InFlightBatch inFlightBatch,
RecordState recordState,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches) {
log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", inFlightBatch,
groupId, topicIdPartition);
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
// Check if member id is the owner of the offset.
if (!offsetState.getValue().memberId().equals(memberId) && !offsetState.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
log.debug("Member {} is not the owner of offset: {} in batch: {} for the share"
+ " partition: {}-{}. Skipping offset.", memberId, offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
return Optional.empty();
}
if (offsetState.getValue().state == RecordState.ACQUIRED) {
InFlightState updateResult = offsetState.getValue().startStateTransition(
offsetState.getKey() < startOffset ? RecordState.ARCHIVED : recordState,
false,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state for the offset: {} in batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the offset"));
}
// Successfully updated the state of the offset.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
}
}
return Optional.empty();
}
private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String memberId,
InFlightBatch inFlightBatch,
RecordState recordState,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches) {
// Check if member id is the owner of the batch.
if (!inFlightBatch.batchMemberId().equals(memberId) && !inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}. Skipping batch.",
memberId, inFlightBatch, groupId, topicIdPartition);
return Optional.empty();
}
// Change the state of complete batch since the same state exists for the entire inFlight batch.
log.trace("Releasing acquired records for complete batch {} for the share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
InFlightState updateResult = inFlightBatch.startBatchStateTransition(
inFlightBatch.lastOffset() < startOffset ? RecordState.ARCHIVED : recordState,
false,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state for the batch: {}"
+ " for the share partition: {}-{}", inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch"));
}
// Successfully updated the state of the batch.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state.id, (short) updateResult.deliveryCount));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
}
return Optional.empty();
} }
/** /**

View File

@ -1799,6 +1799,676 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(5L).offsetState().get(10L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(5L).offsetState().get(10L).acquisitionLockTimeoutTask());
} }
@Test
public void testReleaseSingleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 10, 0, memoryRecords(1, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState());
assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(0L).offsetState());
}
@Test
public void testReleaseMultipleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).offsetState());
}
@Test
public void testReleaseMultipleAcknowledgedRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records0 = memoryRecords(5, 0);
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecords records2 = memoryRecords(9, 10);
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records0,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 18, Collections.singletonList((byte) 1))));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(3, sharePartition.cachedState().size());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(10L).batchState());
assertNull(sharePartition.cachedState().get(5L).offsetState());
assertNull(sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testReleaseAcknowledgedMultipleSubsetRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10);
// Gap from 12-13 offsets.
recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap for 15 offset.
recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap from 17-19 offsets.
recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
// Acknowledging over subset of both batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(6, 18, Arrays.asList(
(byte) 1, (byte) 1, (byte) 1,
(byte) 1, (byte) 1, (byte) 1,
(byte) 0, (byte) 0, (byte) 1,
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testReleaseAcquiredRecordsWithAnotherMember() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(1, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10);
// Gap from 12-13 offsets.
recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap for 15 offset.
recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap from 17-19 offsets.
recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
sharePartition.acquire("member-2", new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
// Acknowledging over subset of second batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(10, 18, Arrays.asList(
(byte) 1, (byte) 1, (byte) 0, (byte) 0,
(byte) 1, (byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
// Release acquired records for "member-1".
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(19, sharePartition.nextFetchOffset());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
// Release acquired records for "member-2".
releaseResult = sharePartition.releaseAcquiredRecords("member-2");
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(5L).batchMemberId());
// Check cached state.
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testReleaseAcquiredRecordsWithAnotherMemberAndSubsetAcknowledged() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10);
// Gap from 12-13 offsets.
recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap for 15 offset.
recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap from 17-19 offsets.
recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
sharePartition.acquire("member-2", new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
// Acknowledging over subset of second batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(10, 18, Arrays.asList(
(byte) 1, (byte) 1, (byte) 0, (byte) 0,
(byte) 1, (byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
// Release acquired records for "member-1".
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(19, sharePartition.nextFetchOffset());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
// Ack subset of records by "member-2".
sharePartition.acknowledge("member-2",
Collections.singletonList(new ShareAcknowledgementBatch(5, 5, Collections.singletonList((byte) 1))));
// Release acquired records for "member-2".
releaseResult = sharePartition.releaseAcquiredRecords("member-2");
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(6, sharePartition.nextFetchOffset());
// Check cached state.
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testReleaseAcquiredRecordsForEmptyCachedData() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
// Release a batch when cache is empty.
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
}
@Test
public void testReleaseAcquiredRecordsAfterDifferentAcknowledges() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 6, Collections.singletonList((byte) 2))));
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(8, 9, Collections.singletonList((byte) 1))));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
}
@Test
public void testMaxDeliveryCountLimitExceededForRecordsSubsetAfterReleaseAcquiredRecords() {
SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, memoryRecords(10, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
MemoryRecords records2 = memoryRecords(5, 10);
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
new ShareAcknowledgementBatch(10, 14, Collections.singletonList((byte) 2))));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(10L).batchState());
assertNull(sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testMaxDeliveryCountLimitExceededForRecordsSubsetAfterReleaseAcquiredRecordsSubset() {
SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build();
// First fetch request with 5 records starting from offset 10.
MemoryRecords records1 = memoryRecords(5, 10);
// Second fetch request with 5 records starting from offset 15.
MemoryRecords records2 = memoryRecords(5, 15);
// third fetch request with 5 records starting from offset20.
MemoryRecords records3 = memoryRecords(5, 20);
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 50, 3, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(Arrays.asList(
new ShareAcknowledgementBatch(13, 16, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(17, 19, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(20, 24, Collections.singletonList((byte) 2))
)));
// Send next batch from offset 13, only 2 records should be acquired.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
// Send next batch from offset 15, only 2 records should be acquired.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(10, sharePartition.nextFetchOffset());
assertEquals(3, sharePartition.cachedState().size());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState());
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(15L).batchState());
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(20L).batchMemberId());
assertNull(sharePartition.cachedState().get(20L).offsetState());
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(15L).offsetState());
}
@Test
public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() {
SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build();
// First fetch request with 5 records starting from offset 10.
MemoryRecords records1 = memoryRecords(5, 10);
// Second fetch request with 5 records starting from offset 15.
MemoryRecords records2 = memoryRecords(5, 15);
// third fetch request with 5 records starting from offset20.
MemoryRecords records3 = memoryRecords(5, 20);
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 50, 3, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(Arrays.asList(
new ShareAcknowledgementBatch(10, 12, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(13, 16, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(17, 19, Collections.singletonList((byte) 3)),
new ShareAcknowledgementBatch(20, 24, Collections.singletonList((byte) 2))
)));
// Send next batch from offset 13, only 2 records should be acquired.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
// Send next batch from offset 15, only 2 records should be acquired.
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 40, 3, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(25, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
}
@Test
public void testReleaseAcquiredRecordsSubsetWithAnotherMember() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire("member-1",
new FetchPartitionData(Errors.NONE, 30, 0, memoryRecords(7, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acknowledge("member-1",
Collections.singletonList(new ShareAcknowledgementBatch(5, 7, Collections.singletonList((byte) 1))));
// Release acquired records subset with another member.
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords("member-2");
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACQUIRED, (short) 1, "member-1"));
expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACQUIRED, (short) 1, "member-1"));
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACQUIRED, (short) 1, "member-1"));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACQUIRED, (short) 1, "member-1"));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
}
@Test
public void testReleaseBatchWithWriteShareGroupStateFailure() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
// Due to failure in writeShareGroupState, the cached state should not be updated.
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState());
assertEquals(MEMBER_ID, sharePartition.cachedState().get(5L).batchMemberId());
}
@Test
public void testReleaseOffsetWithWriteShareGroupStateFailure() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns true for acknowledge to pass.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(6, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(8, 9, Collections.singletonList((byte) 1))));
// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
// Due to failure in writeShareGroupState, the cached state should not be updated.
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(5L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(6L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(7L).state());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).offsetState().get(8L).state());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).offsetState().get(9L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(10L).state());
assertEquals(MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
assertEquals(MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
assertEquals(MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
assertEquals(MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
}
@Test
public void testAcquisitionLockOnReleasingMultipleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).offsetState());
// Acquisition lock timer task would be cancelled by the release acquired records operation.
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
}
@Test
public void testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchWithGapOffsets() {
SharePartition sharePartition = SharePartitionBuilder.builder().withAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10);
// Gap from 12-13 offsets.
recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap for 15 offset.
recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap from 17-19 offsets.
recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
// Acknowledging over subset of both batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 18, Arrays.asList(
(byte) 1, (byte) 1, (byte) 1,
(byte) 1, (byte) 1, (byte) 1,
(byte) 0, (byte) 0, (byte) 1,
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
CompletableFuture<Optional<Throwable>> releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID);
assertFalse(releaseResult.isCompletedExceptionally());
assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
// Acquisition lock timer task would be cancelled by the release acquired records operation.
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(20L).acquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
}
private MemoryRecords memoryRecords(int numOfRecords) { private MemoryRecords memoryRecords(int numOfRecords) {
return memoryRecords(numOfRecords, 0); return memoryRecords(numOfRecords, 0);
} }