KAFKA-16753: Implement share acknowledge API in partition (KIP-932) (#16339)

The share-partition leader keeps track of the state and delivery attempts for in-flight records. However, delivery attempts tracking follows atleast-once semantics.

The consumer processes the records and acknowledges them upon successful consumption. This successful attempt triggers a transition to the "Acknowledged" state.

The code implements the functionality to acknowledge the offset/batches in the request to in-memory cached data.

Reviewers:  Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-06-18 18:07:59 +01:00 committed by GitHub
parent c4a3d2475f
commit f2a552a1eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1428 additions and 8 deletions

View File

@ -19,9 +19,21 @@ package kafka.server.share;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.GroupTopicPartitionData;
import org.apache.kafka.server.group.share.PartitionErrorData;
import org.apache.kafka.server.group.share.PartitionFactory;
import org.apache.kafka.server.group.share.PartitionStateBatchData;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.group.share.PersisterStateBatch;
import org.apache.kafka.server.group.share.TopicData;
import org.apache.kafka.server.group.share.WriteShareGroupStateParameters;
import org.apache.kafka.server.group.share.WriteShareGroupStateResult;
import org.apache.kafka.server.share.ShareAcknowledgementBatch; import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask; import org.apache.kafka.server.util.timer.TimerTask;
@ -31,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -38,6 +51,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -188,6 +202,11 @@ public class SharePartition {
*/ */
private final Time time; private final Time time;
/**
* The persister is used to persist the state of the share partition to disk.
*/
private final Persister persister;
/** /**
* The share partition start offset specifies the partition start offset from which the records * The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition. * are cached in the cachedState of the sharePartition.
@ -218,6 +237,7 @@ public class SharePartition {
int recordLockDurationMs, int recordLockDurationMs,
Timer timer, Timer timer,
Time time, Time time,
Persister persister,
ReplicaManager replicaManager ReplicaManager replicaManager
) { ) {
this.groupId = groupId; this.groupId = groupId;
@ -231,6 +251,7 @@ public class SharePartition {
this.recordLockDurationMs = recordLockDurationMs; this.recordLockDurationMs = recordLockDurationMs;
this.timer = timer; this.timer = timer;
this.time = time; this.time = time;
this.persister = persister;
this.replicaManager = replicaManager; this.replicaManager = replicaManager;
// Initialize the partition. // Initialize the partition.
initialize(); initialize();
@ -456,10 +477,68 @@ public class SharePartition {
) { ) {
log.trace("Acknowledgement batch request for share partition: {}-{}", groupId, topicIdPartition); log.trace("Acknowledgement batch request for share partition: {}-{}", groupId, topicIdPartition);
CompletableFuture<Optional<Throwable>> future = new CompletableFuture<>(); Throwable throwable = null;
future.completeExceptionally(new UnsupportedOperationException("Not implemented")); lock.writeLock().lock();
List<InFlightState> updatedStates = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
try {
// Avoided using enhanced for loop as need to check if the last batch have offsets
// in the range.
for (int i = 0; i < acknowledgementBatch.size(); i++) {
ShareAcknowledgementBatch batch = acknowledgementBatch.get(i);
return future; // Client can either send a single entry in acknowledgeTypes which represents the state
// of the complete batch or can send individual offsets state.
Map<Long, RecordState> recordStateMap;
try {
recordStateMap = fetchRecordStateMapForAcknowledgementBatch(batch);
} catch (IllegalArgumentException e) {
log.debug("Invalid acknowledge type: {} for share partition: {}-{}",
batch.acknowledgeTypes(), groupId, topicIdPartition);
throwable = new InvalidRequestException("Invalid acknowledge type: " + batch.acknowledgeTypes());
break;
}
if (batch.lastOffset() < startOffset) {
log.trace("All offsets in the acknowledgement batch {} are already archived: {}-{}",
batch, groupId, topicIdPartition);
continue;
}
// Fetch the sub-map from the cached map for the batch to acknowledge. The sub-map can
// be a full match, subset or spans over multiple fetched batches.
NavigableMap<Long, InFlightBatch> subMap;
try {
subMap = fetchSubMapForAcknowledgementBatch(batch, i == acknowledgementBatch.size() - 1);
} catch (InvalidRecordStateException | InvalidRequestException e) {
throwable = e;
break;
}
// Acknowledge the records for the batch.
Optional<Throwable> ackThrowable = acknowledgementBatchRecords(
memberId,
batch,
recordStateMap,
subMap,
updatedStates,
stateBatches
);
if (ackThrowable.isPresent()) {
throwable = ackThrowable.get();
break;
}
}
// If the acknowledgement is successful then persist state, complete the state transition
// and update the cached state for start offset. Else rollback the state transition.
rollbackOrProcessStateUpdates(throwable, updatedStates, stateBatches);
} finally {
lock.writeLock().unlock();
}
return CompletableFuture.completedFuture(Optional.ofNullable(throwable));
} }
/** /**
@ -632,6 +711,524 @@ public class SharePartition {
return inFlightBatch.firstOffset() >= firstOffsetToCompare && inFlightBatch.lastOffset() <= lastOffsetToCompare; return inFlightBatch.firstOffset() >= firstOffsetToCompare && inFlightBatch.lastOffset() <= lastOffsetToCompare;
} }
/**
* Check if the start offset has moved and within the request first and last offset.
*
* @param batchFirstOffset The first offset of the batch.
* @param batchLastOffset The last offset of the batch.
*
* @return True if the start offset has moved and within the request first and last offset, false otherwise.
*/
private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batchLastOffset) {
return batchFirstOffset < startOffset && batchLastOffset >= startOffset;
}
private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
ShareAcknowledgementBatch batch) {
// Client can either send a single entry in acknowledgeTypes which represents the state
// of the complete batch or can send individual offsets state. Construct a map with record state
// for each offset in the batch, if single acknowledge type is sent, the map will have only one entry.
Map<Long, RecordState> recordStateMap = new HashMap<>();
for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
recordStateMap.put(batch.firstOffset() + index,
fetchRecordState(batch.acknowledgeTypes().get(index)));
}
return recordStateMap;
}
private static RecordState fetchRecordState(byte acknowledgeType) {
switch (acknowledgeType) {
case 1 /* ACCEPT */:
return RecordState.ACKNOWLEDGED;
case 2 /* RELEASE */:
return RecordState.AVAILABLE;
case 3 /* REJECT */:
case 0 /* GAP */:
return RecordState.ARCHIVED;
default:
throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType);
}
}
private NavigableMap<Long, InFlightBatch> fetchSubMapForAcknowledgementBatch(
ShareAcknowledgementBatch batch,
boolean isLastBatch
) {
lock.writeLock().lock();
try {
// Find the floor batch record for the request batch. The request batch could be
// for a subset of the batch i.e. cached batch of offset 10-14 and request batch
// of 12-13. Hence, floor entry is fetched to find the sub-map.
Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(batch.firstOffset());
if (floorOffset == null) {
boolean hasStartOffsetMoved = checkForStartOffsetWithinBatch(batch.firstOffset(), batch.lastOffset());
if (hasStartOffsetMoved) {
// If the start offset has been moved and within the request batch then fetch
// the floor entry from start offset and acknowledge cached offsets. Consider
// the case where the start offset has moved from 0 to 10, with the cached batch
// of 0 - 5, 5 - 10, 10 - 12, 12 - 15. The request batch for acknowledgement is 5 - 15,
// then post acquisition lock timeout the cache will have data from only from 10 to 15.
// Hence, we need to fetch the floor entry from start offset.
floorOffset = cachedState.floorEntry(startOffset);
} else {
log.debug("Batch record {} not found for share partition: {}-{}", batch, groupId,
topicIdPartition);
throw new InvalidRecordStateException(
"Batch record not found. The request batch offsets are not found in the cache.");
}
}
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, batch.lastOffset(), true);
// Validate if the request batch has the first offset less than the last offset of the last
// fetched cached batch, then there will be offsets in the request that are already acquired.
if (subMap.lastEntry().getValue().lastOffset < batch.firstOffset()) {
log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition);
throw new InvalidRequestException("Batch record not found. The first offset in request is past acquired records.");
}
// Validate if the request batch has the last offset greater than the last offset of
// the last fetched cached batch, then there will be offsets in the request than cannot
// be found in the fetched batches.
if (isLastBatch && batch.lastOffset() > subMap.lastEntry().getValue().lastOffset) {
log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition);
throw new InvalidRequestException("Batch record not found. The last offset in request is past acquired records.");
}
return subMap;
} finally {
lock.writeLock().unlock();
}
}
private Optional<Throwable> acknowledgementBatchRecords(
String memberId,
ShareAcknowledgementBatch batch,
Map<Long, RecordState> recordStateMap,
NavigableMap<Long, InFlightBatch> subMap,
final List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches
) {
Optional<Throwable> throwable;
lock.writeLock().lock();
try {
// The acknowledgement batch either is exact fetch equivalent batch (mostly), subset
// or spans over multiple fetched batches. The state can vary per offset itself from
// the fetched batch in case of subset or client sent individual offsets state.
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
// If startOffset has moved ahead of the in-flight batch, skip the batch.
if (inFlightBatch.lastOffset() < startOffset) {
log.trace("All offsets in the inflight batch {} are already archived: {}-{}",
inFlightBatch, groupId, topicIdPartition);
continue;
}
// Validate if the requested member id is the owner of the batch.
if (inFlightBatch.offsetState() == null) {
throwable = validateAcknowledgementBatchMemberId(memberId, inFlightBatch);
if (throwable.isPresent()) {
return throwable;
}
}
// Determine if the in-flight batch is a full match from the request batch.
boolean fullMatch = checkForFullMatch(inFlightBatch, batch.firstOffset(), batch.lastOffset());
boolean isPerOffsetClientAck = batch.acknowledgeTypes().size() > 1;
boolean hasStartOffsetMoved = checkForStartOffsetWithinBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset);
// Maintain state per offset if the inflight batch is not a full match or the
// offset state is managed or client sent individual offsets state or
// the start offset is within this in-flight batch.
if (!fullMatch || inFlightBatch.offsetState() != null || isPerOffsetClientAck || hasStartOffsetMoved) {
log.debug("Subset or offset tracked batch record found for acknowledgement,"
+ " batch: {}, request offsets - first: {}, last: {}, client per offset"
+ "state {} for the share partition: {}-{}", inFlightBatch, batch.firstOffset(),
batch.lastOffset(), isPerOffsetClientAck, groupId, topicIdPartition);
if (inFlightBatch.offsetState() == null) {
// Though the request is a subset of in-flight batch but the offset
// tracking has not been initialized yet which means that we could only
// acknowledge subset of offsets from the in-flight batch but only if the
// complete batch is acquired yet. Hence, do a pre-check to avoid exploding
// the in-flight offset tracking unnecessarily.
if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
log.debug("The batch is not in the acquired state: {} for share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The subset batch is not in the acquired state."));
}
// The request batch is a subset or per offset state is managed hence update
// the offsets state in the in-flight batch.
inFlightBatch.maybeInitializeOffsetStateUpdate();
}
throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch,
recordStateMap, updatedStates, stateBatches);
} else {
// The in-flight batch is a full match hence change the state of the complete batch.
throwable = acknowledgeCompleteBatch(batch, inFlightBatch,
recordStateMap.get(batch.firstOffset()), updatedStates, stateBatches);
}
if (throwable.isPresent()) {
return throwable;
}
}
} finally {
lock.writeLock().unlock();
}
return Optional.empty();
}
private Optional<Throwable> validateAcknowledgementBatchMemberId(
String memberId,
InFlightBatch inFlightBatch
) {
// EMPTY_MEMBER_ID is used to indicate that the batch is not in acquired state.
if (inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
log.debug("The batch is not in the acquired state: {} for share partition: {}-{}. Empty member id for batch.",
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The batch is not in the acquired state."));
}
if (!inFlightBatch.batchMemberId().equals(memberId)) {
log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}",
memberId, inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException("Member is not the owner of batch record"));
}
return Optional.empty();
}
private Optional<Throwable> acknowledgePerOffsetBatchRecords(
String memberId,
ShareAcknowledgementBatch batch,
InFlightBatch inFlightBatch,
Map<Long, RecordState> recordStateMap,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches
) {
lock.writeLock().lock();
try {
// Fetch the first record state from the map to be used as default record state in case the
// offset record state is not provided by client.
RecordState recordStateDefault = recordStateMap.get(batch.firstOffset());
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
// 1. For the first batch which might have offsets prior to the request base
// offset i.e. cached batch of 10-14 offsets and request batch of 12-13.
// 2. Skip the offsets which are below the start offset of the share partition
if (offsetState.getKey() < batch.firstOffset() || offsetState.getKey() < startOffset) {
continue;
}
if (offsetState.getKey() > batch.lastOffset()) {
// No further offsets to process.
break;
}
if (offsetState.getValue().state != RecordState.ACQUIRED) {
log.debug("The offset is not acquired, offset: {} batch: {} for the share"
+ " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId,
topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"The batch cannot be acknowledged. The offset is not acquired."));
}
// Check if member id is the owner of the offset.
if (!offsetState.getValue().memberId.equals(memberId)) {
log.debug("Member {} is not the owner of offset: {} in batch: {} for the share"
+ " partition: {}-{}", memberId, offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition);
return Optional.of(
new InvalidRecordStateException("Member is not the owner of offset"));
}
// Determine the record state for the offset. If the per offset record state is not provided
// by the client, then use the batch record state.
RecordState recordState =
recordStateMap.size() > 1 ? recordStateMap.get(offsetState.getKey()) :
recordStateDefault;
InFlightState updateResult = offsetState.getValue().startStateTransition(
recordState,
false,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
if (updateResult == null) {
log.debug("Unable to acknowledge records for the offset: {} in batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"Unable to acknowledge records for the batch"));
}
// Successfully updated the state of the offset.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
}
} finally {
lock.writeLock().unlock();
}
return Optional.empty();
}
private Optional<Throwable> acknowledgeCompleteBatch(
ShareAcknowledgementBatch batch,
InFlightBatch inFlightBatch,
RecordState recordState,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches
) {
lock.writeLock().lock();
try {
// The in-flight batch is a full match hence change the state of the complete.
log.trace("Acknowledging complete batch record {} for the share partition: {}-{}",
batch, groupId, topicIdPartition);
if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
log.debug("The batch is not in the acquired state: {} for share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"The batch cannot be acknowledged. The batch is not in the acquired state."));
}
// Change the state of complete batch since the same state exists for the entire inFlight batch.
// The member id is reset to EMPTY_MEMBER_ID irrespective of the acknowledge type as the batch is
// either released or moved to a state where member id existence is not important. The member id
// is only important when the batch is acquired.
InFlightState updateResult = inFlightBatch.startBatchStateTransition(
recordState,
false,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
if (updateResult == null) {
log.debug("Unable to acknowledge records for the batch: {} with state: {}"
+ " for the share partition: {}-{}", inFlightBatch, recordState, groupId,
topicIdPartition);
return Optional.of(
new InvalidRecordStateException("Unable to acknowledge records for the batch"));
}
// Successfully updated the state of the batch.
updatedStates.add(updateResult);
stateBatches.add(
new PersisterStateBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset,
updateResult.state.id, (short) updateResult.deliveryCount));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the nextFetchOffset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
} finally {
lock.writeLock().unlock();
}
return Optional.empty();
}
// Visible for testing
void rollbackOrProcessStateUpdates(
Throwable throwable,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches
) {
lock.writeLock().lock();
try {
if (throwable != null || !isWriteShareGroupStateSuccessful(stateBatches)) {
// Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any changed state"
+ " for the share partition: {}-{}", groupId, topicIdPartition);
updatedStates.forEach(state -> state.completeStateTransition(false));
} else {
log.trace("State change request successful for share partition: {}-{}",
groupId, topicIdPartition);
updatedStates.forEach(state -> {
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
maybeUpdateCachedStateAndOffsets();
}
} finally {
lock.writeLock().unlock();
}
}
private void maybeUpdateCachedStateAndOffsets() {
lock.writeLock().lock();
try {
if (!canMoveStartOffset()) {
return;
}
// This will help to find the next position for the startOffset.
// The new position of startOffset will be lastOffsetAcknowledged + 1
long lastOffsetAcknowledged = findLastOffsetAcknowledged();
// If lastOffsetAcknowledged is -1, this means we cannot move out startOffset ahead
if (lastOffsetAcknowledged == -1) {
return;
}
// This is true if all records in the cachedState have been acknowledged (either Accept or Reject).
// The resulting action should be to empty the cachedState altogether
long lastCachedOffset = cachedState.lastEntry().getValue().lastOffset();
if (lastOffsetAcknowledged == lastCachedOffset) {
startOffset = lastCachedOffset + 1; // The next offset that will be fetched and acquired in the share partition
endOffset = lastCachedOffset + 1;
cachedState.clear();
// Nothing further to do.
return;
}
/*
The cachedState contains some records that are yet to be acknowledged, and thus should
not be removed. Only a subMap will be removed from the cachedState. The logic to remove
batches from cachedState is as follows:
a) Only full batches can be removed from the cachedState, For example if there is batch (0-99)
and 0-49 records are acknowledged (ACCEPT or REJECT), the first 50 records will not be removed
from the cachedState. Instead, the startOffset will be moved to 50, but the batch will only
be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT).
*/
// Since only a subMap will be removed, we need to find the first and last keys of that subMap
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged);
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
lastKeyToRemove = entry.getKey();
} else {
startOffset = lastOffsetAcknowledged + 1;
if (entry.getKey().equals(cachedState.firstKey())) {
// If the first batch in cachedState has some records yet to be acknowledged,
// then nothing should be removed from cachedState
lastKeyToRemove = -1;
} else {
lastKeyToRemove = cachedState.lowerKey(entry.getKey());
}
}
if (lastKeyToRemove != -1) {
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(firstKeyToRemove, true, lastKeyToRemove, true);
for (Long key : subMap.keySet()) {
cachedState.remove(key);
}
}
} finally {
lock.writeLock().unlock();
}
}
private boolean canMoveStartOffset() {
// The Share Partition Start Offset may be moved after acknowledgement request is complete.
// The following conditions need to be met to move the startOffset:
// 1. When the cachedState is not empty.
// 2. When the acknowledgement type for the records is either ACCEPT or REJECT.
// 3. When all the previous records have been acknowledged (ACCEPT or REJECT).
if (cachedState.isEmpty()) {
return false;
}
NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(startOffset);
if (entry == null) {
log.error("The start offset: {} is not found in the cached state for share partition: {}-{}."
+ " Cannot move the start offset.", startOffset, groupId, topicIdPartition);
return false;
}
RecordState startOffsetState = entry.getValue().offsetState == null ?
entry.getValue().batchState() :
entry.getValue().offsetState().get(startOffset).state();
return isRecordStateAcknowledged(startOffsetState);
}
/**
* The record state is considered acknowledged if it is either acknowledged or archived.
* These are terminal states for the record.
*
* @param recordState The record state to check.
*
* @return True if the record state is acknowledged or archived, false otherwise.
*/
private boolean isRecordStateAcknowledged(RecordState recordState) {
return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED;
}
private long findLastOffsetAcknowledged() {
lock.readLock().lock();
long lastOffsetAcknowledged = -1;
try {
for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
if (inFlightBatch.offsetState() == null) {
if (!isRecordStateAcknowledged(inFlightBatch.batchState())) {
return lastOffsetAcknowledged;
}
lastOffsetAcknowledged = inFlightBatch.lastOffset();
} else {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
if (!isRecordStateAcknowledged(offsetState.getValue().state())) {
return lastOffsetAcknowledged;
}
lastOffsetAcknowledged = offsetState.getKey();
}
}
}
} finally {
lock.readLock().unlock();
}
return lastOffsetAcknowledged;
}
// Visible for testing
boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> stateBatches) {
WriteShareGroupStateResult response;
try {
response = persister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
).build()).build()).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to write the share group state for share partition: {}-{}", groupId, topicIdPartition, e);
throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition), e);
}
if (response == null || response.topicsData() == null || response.topicsData().size() != 1) {
log.error("Failed to write the share group state for share partition: {}-{}. Invalid state found: {}",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition));
}
TopicData<PartitionErrorData> state = response.topicsData().get(0);
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1
|| state.partitions().get(0).partition() != topicIdPartition.partition()) {
log.error("Failed to write the share group state for share partition: {}-{}. Invalid topic partition response: {}",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition));
}
PartitionErrorData partitionData = state.partitions().get(0);
if (partitionData.errorCode() != Errors.NONE.code()) {
Exception exception = Errors.forCode(partitionData.errorCode()).exception();
log.error("Failed to write the share group state for share partition: {}-{} due to exception",
groupId, topicIdPartition, exception);
return false;
}
return true;
}
private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) {
return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs);
} }
@ -772,6 +1369,14 @@ public class SharePartition {
return batchState.deliveryCount; return batchState.deliveryCount;
} }
// Visible for testing.
AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
return batchState.acquisitionLockTimeoutTask;
}
// Visible for testing. // Visible for testing.
NavigableMap<Long, InFlightState> offsetState() { NavigableMap<Long, InFlightState> offsetState() {
return offsetState; return offsetState;
@ -784,6 +1389,14 @@ public class SharePartition {
return batchState.tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId); return batchState.tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
} }
private InFlightState startBatchStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount,
String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
}
return batchState.startStateTransition(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
}
private void maybeInitializeOffsetStateUpdate() { private void maybeInitializeOffsetStateUpdate() {
if (offsetState == null) { if (offsetState == null) {
offsetState = new ConcurrentSkipListMap<>(); offsetState = new ConcurrentSkipListMap<>();
@ -841,6 +1454,9 @@ public class SharePartition {
private int deliveryCount; private int deliveryCount;
// The member id of the client that is fetching/acknowledging the record. // The member id of the client that is fetching/acknowledging the record.
private String memberId; private String memberId;
// The state of the records before the transition. In case we need to revert an in-flight state, we revert the above
// attributes of InFlightState to this state, namely - state, deliveryCount and memberId.
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout. // The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask; private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
@ -856,6 +1472,25 @@ public class SharePartition {
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
} }
// Visible for testing.
RecordState state() {
return state;
}
// Visible for testing.
int deliveryCount() {
return deliveryCount;
}
String memberId() {
return memberId;
}
// Visible for testing.
TimerTask acquisitionLockTimeoutTask() {
return acquisitionLockTimeoutTask;
}
void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) { void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
} }
@ -893,6 +1528,39 @@ public class SharePartition {
} }
} }
private InFlightState startStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
return tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
}
private void completeStateTransition(boolean commit) {
if (commit) {
rollbackState = null;
return;
}
state = rollbackState.state;
deliveryCount = rollbackState.deliveryCount;
memberId = rollbackState.memberId;
rollbackState = null;
}
@Override
public int hashCode() {
return Objects.hash(state, deliveryCount, memberId);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InFlightState that = (InFlightState) o;
return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId);
}
@Override @Override
public String toString() { public String toString() {
return "InFlightState(" + return "InFlightState(" +

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition; import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareSession; import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionCache;
@ -102,15 +103,21 @@ public class SharePartitionManager implements AutoCloseable {
*/ */
private final int maxDeliveryCount; private final int maxDeliveryCount;
/**
* The persister is used to persist the share partition state.
*/
private final Persister persister;
public SharePartitionManager( public SharePartitionManager(
ReplicaManager replicaManager, ReplicaManager replicaManager,
Time time, Time time,
ShareSessionCache cache, ShareSessionCache cache,
int recordLockDurationMs, int recordLockDurationMs,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages int maxInFlightMessages,
Persister persister
) { ) {
this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages); this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages, persister);
} }
SharePartitionManager( SharePartitionManager(
@ -120,7 +127,8 @@ public class SharePartitionManager implements AutoCloseable {
Map<SharePartitionKey, SharePartition> partitionCacheMap, Map<SharePartitionKey, SharePartition> partitionCacheMap,
int recordLockDurationMs, int recordLockDurationMs,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages int maxInFlightMessages,
Persister persister
) { ) {
this.replicaManager = replicaManager; this.replicaManager = replicaManager;
this.time = time; this.time = time;
@ -131,6 +139,7 @@ public class SharePartitionManager implements AutoCloseable {
this.recordLockDurationMs = recordLockDurationMs; this.recordLockDurationMs = recordLockDurationMs;
this.maxDeliveryCount = maxDeliveryCount; this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages; this.maxInFlightMessages = maxInFlightMessages;
this.persister = persister;
} }
/** /**

View File

@ -32,6 +32,8 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey; import org.apache.kafka.server.share.ShareSessionKey;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -904,6 +906,7 @@ public class SharePartitionManagerTest {
private Time time = new MockTime(); private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, 1000); private ShareSessionCache cache = new ShareSessionCache(10, 1000);
private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Persister persister = NoOpShareStatePersister.getInstance();
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager; this.replicaManager = replicaManager;
@ -925,12 +928,17 @@ public class SharePartitionManagerTest {
return this; return this;
} }
private SharePartitionManagerBuilder withShareGroupPersister(Persister persister) {
this.persister = persister;
return this;
}
public static SharePartitionManagerBuilder builder() { public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder(); return new SharePartitionManagerBuilder();
} }
public SharePartitionManager build() { public SharePartitionManager build() {
return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES); return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, persister);
} }
} }
} }

View File

@ -17,10 +17,13 @@
package kafka.server.share; package kafka.server.share;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.RecordState;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
@ -28,6 +31,9 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
@ -40,15 +46,21 @@ import org.mockito.Mockito;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -317,6 +329,598 @@ public class SharePartitionTest {
assertTrue(sharePartition.maybeAcquireFetchLock()); assertTrue(sharePartition.maybeAcquireFetchLock());
} }
@Test
public void testAcknowledgeSingleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(1, 0);
MemoryRecords records2 = memoryRecords(1, 1);
// Another batch is acquired because if there is only 1 batch, and it is acknowledged, the batch will be removed from cachedState
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 10, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 10, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(1, 1, Collections.singletonList((byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
assertEquals(2, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(1L).batchState());
assertEquals(1, sharePartition.cachedState().get(1L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(1L).offsetState());
}
@Test
public void testAcknowledgeMultipleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records = memoryRecords(10, 5);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 14, Collections.singletonList((byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
assertEquals(15, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
}
@Test
public void testAcknowledgeMultipleRecordBatchWithGapOffsets() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10);
// Gap from 15-17 offsets.
recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), result.join().toArray());
assertEquals(7, sharePartition.nextFetchOffset());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), result.join().toArray());
assertEquals(19, sharePartition.nextFetchOffset());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Arrays.asList(
new ShareAcknowledgementBatch(5, 6, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(10, 18, Arrays.asList(
(byte) 2, (byte) 2, (byte) 2,
(byte) 2, (byte) 2, (byte) 0,
(byte) 0, (byte) 0, (byte) 1
))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState());
assertNull(sharePartition.cachedState().get(5L).offsetState());
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testAcknowledgeMultipleSubsetRecordBatchWithGapOffsets() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(2, 5);
// Untracked gap of 3 offsets from 7-9.
MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10);
// Gap from 12-13 offsets.
recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap for 15 offset.
recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
// Gap from 17-19 offsets.
recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), result.join().toArray());
assertEquals(7, sharePartition.nextFetchOffset());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), result.join().toArray());
assertEquals(21, sharePartition.nextFetchOffset());
// Acknowledging over subset of both batch with subset of gap offsets.
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 18, Arrays.asList(
// TODO: NOTE - untracked gap of 3 offsets from 7-9 has no effect on acknowledgment
// irrespective of acknowledgement type provided. While acquiring, the log start
// offset should be used to determine such gaps.
(byte) 1, (byte) 1, (byte) 1,
(byte) 1, (byte) 1, (byte) 1,
(byte) 0, (byte) 0, (byte) 1,
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
assertEquals(21, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(5L).batchState());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(20L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
}
@Test
public void testAcknowledgeOutOfRangeCachedData() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
// Acknowledge a batch when cache is empty.
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(0, 15, Collections.singletonList((byte) 3))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
MemoryRecords records = memoryRecords(5, 5);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
// Cached data with offset 5-9 should exist.
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(5L));
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(20, 25, Collections.singletonList((byte) 3))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRequestException.class, ackResult.join().get().getClass());
}
@Test
public void testAcknowledgeWithAnotherMember() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records = memoryRecords(5, 5);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
// Cached data with offset 5-9 should exist.
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(5L));
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
"member-2",
Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 3))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
}
@Test
public void testAcknowledgeWhenOffsetNotAcquired() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records = memoryRecords(5, 5);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
// Cached data with offset 5-9 should exist.
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(5L));
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
// Acknowledge the same batch again but with ACCEPT type.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
// Re-acquire the same batch and then acknowledge subset with ACCEPT type.
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 8, Collections.singletonList((byte) 3))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
// Re-acknowledge the subset batch with REJECT type.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 8, Collections.singletonList((byte) 3))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
}
@Test
public void testAcknowledgeRollbackWithFullBatchError() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(5, 5);
MemoryRecords records2 = memoryRecords(5, 10);
MemoryRecords records3 = memoryRecords(5, 15);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
// Cached data with offset 5-19 should exist.
assertEquals(3, sharePartition.cachedState().size());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Arrays.asList(
new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(10, 14, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)),
// Add another batch which should fail the request.
new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
// Check the state of the cache. The state should be acquired itself.
assertEquals(3, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState());
}
@Test
public void testAcknowledgeRollbackWithSubsetError() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records1 = memoryRecords(5, 5);
MemoryRecords records2 = memoryRecords(5, 10);
MemoryRecords records3 = memoryRecords(5, 15);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 0, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
// Cached data with offset 5-19 should exist.
assertEquals(3, sharePartition.cachedState().size());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Arrays.asList(
new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(10, 14, Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)),
// Add another batch which should fail the request.
new ShareAcknowledgementBatch(16, 19, Collections.singletonList((byte) 1))));
assertFalse(ackResult.isCompletedExceptionally());
assertTrue(ackResult.join().isPresent());
assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass());
// Check the state of the cache. The state should be acquired itself.
assertEquals(3, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());
// Though the last batch is subset but the offset state map will not be exploded as the batch is
// not in acquired state due to previous batch acknowledgement.
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState());
}
@Test
public void testAcquireReleasedRecord() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
MemoryRecords records = memoryRecords(5, 10);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 3, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
List<AcquiredRecords> acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(12, 13, Collections.singletonList((byte) 2))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
assertEquals(12, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState());
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
// Send the same fetch request batch again but only 2 offsets should come as acquired.
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 3, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
}
@Test
public void testAcquireReleasedRecordMultipleBatches() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
// First fetch request with 5 records starting from offset 10.
MemoryRecords records1 = memoryRecords(5, 10);
// Second fetch request with 5 records starting from offset 15.
MemoryRecords records2 = memoryRecords(5, 15);
// Third fetch request with 5 records starting from offset 23, gap of 3 offsets.
MemoryRecords records3 = memoryRecords(5, 23);
// Fourth fetch request with 5 records starting from offset 28.
MemoryRecords records4 = memoryRecords(5, 28);
CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 40, 3, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
List<AcquiredRecords> acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(20, sharePartition.nextFetchOffset());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 3, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(28, sharePartition.nextFetchOffset());
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 30, 3, records4,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(33, sharePartition.nextFetchOffset());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(28L).batchState());
assertNull(sharePartition.cachedState().get(10L).offsetState());
assertNull(sharePartition.cachedState().get(15L).offsetState());
assertNull(sharePartition.cachedState().get(23L).offsetState());
assertNull(sharePartition.cachedState().get(28L).offsetState());
CompletableFuture<Optional<Throwable>> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(12, 30, Collections.singletonList((byte) 2))));
assertFalse(ackResult.isCompletedExceptionally());
assertFalse(ackResult.join().isPresent());
assertEquals(12, sharePartition.nextFetchOffset());
assertEquals(4, sharePartition.cachedState().size());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState());
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState());
assertNull(sharePartition.cachedState().get(15L).offsetState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(15L).batchMemberId());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(23L).batchState());
assertNull(sharePartition.cachedState().get(23L).offsetState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(23L).batchMemberId());
assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(28L).batchState());
assertNotNull(sharePartition.cachedState().get(28L).offsetState());
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(28L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(29L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(30L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(31L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
expectedOffsetStateMap.put(32L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(28L).offsetState());
// Send next batch from offset 12, only 3 records should be acquired.
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 40, 3, records1,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
// Though record2 batch exists to acquire but send batch record3, it should be acquired but
// next fetch offset should not move.
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 40, 3, records3,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
// Acquire partial records from batch 2.
MemoryRecords subsetRecords = memoryRecords(2, 17);
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), acquiredRecordsList.toArray());
// Next fetch offset should not move.
assertEquals(15, sharePartition.nextFetchOffset());
// Acquire partial records from record 4 to further test if the next fetch offset move
// accordingly once complete record 2 is also acquired.
subsetRecords = memoryRecords(1, 28);
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), acquiredRecordsList.toArray());
// Next fetch offset should not move.
assertEquals(15, sharePartition.nextFetchOffset());
// Try to acquire complete record 2 though it's already partially acquired, the next fetch
// offset should move.
result = sharePartition.acquire(
MEMBER_ID,
new FetchPartitionData(Errors.NONE, 20, 3, records2,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false));
assertFalse(result.isCompletedExceptionally());
acquiredRecordsList = result.join();
// Offset 15,16 and 19 should be acquired.
List<AcquiredRecords> expectedAcquiredRecords = expectedAcquiredRecords(15, 16, 2);
expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
// Next fetch offset should not move.
assertEquals(29, sharePartition.nextFetchOffset());
}
private MemoryRecords memoryRecords(int numOfRecords) { private MemoryRecords memoryRecords(int numOfRecords) {
return memoryRecords(numOfRecords, 0); return memoryRecords(numOfRecords, 0);
} }
@ -343,11 +947,23 @@ public class SharePartitionTest {
return acquiredRecordsList; return acquiredRecordsList;
} }
private List<AcquiredRecords> expectedAcquiredRecords(long baseOffset, long lastOffset, int deliveryCount) {
List<AcquiredRecords> acquiredRecordsList = new ArrayList<>();
for (long i = baseOffset; i <= lastOffset; i++) {
acquiredRecordsList.add(new AcquiredRecords()
.setFirstOffset(i)
.setLastOffset(i)
.setDeliveryCount((short) deliveryCount));
}
return acquiredRecordsList;
}
private static class SharePartitionBuilder { private static class SharePartitionBuilder {
private int acquisitionLockTimeoutMs = 30000; private int acquisitionLockTimeoutMs = 30000;
private int maxDeliveryCount = MAX_DELIVERY_COUNT; private int maxDeliveryCount = MAX_DELIVERY_COUNT;
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
private Persister persister = NoOpShareStatePersister.getInstance();
private ReplicaManager replicaManager = REPLICA_MANAGER; private ReplicaManager replicaManager = REPLICA_MANAGER;
private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) {
@ -355,13 +971,18 @@ public class SharePartitionTest {
return this; return this;
} }
private SharePartitionBuilder withPersister(Persister persister) {
this.persister = persister;
return this;
}
public static SharePartitionBuilder builder() { public static SharePartitionBuilder builder() {
return new SharePartitionBuilder(); return new SharePartitionBuilder();
} }
public SharePartition build() { public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount,
acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, replicaManager); acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager);
} }
} }
} }

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* A no-op singleton implementation of {@link Persister} interface.
*/
public class NoOpShareStatePersister implements Persister {
private NoOpShareStatePersister() {
}
private static final class InstanceHolder {
static final Persister INSTANCE = new NoOpShareStatePersister();
}
public static Persister getInstance() {
return InstanceHolder.INSTANCE;
}
@Override
public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) {
GroupTopicPartitionData<PartitionStateData> reqData = request.groupTopicPartitionData();
List<TopicData<PartitionErrorData>> resultArgs = new ArrayList<>();
for (TopicData<PartitionStateData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream()
.map(partStateData -> PartitionFactory.newPartitionErrorData(partStateData.partition(), PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
}
@Override
public CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request) {
GroupTopicPartitionData<PartitionIdLeaderEpochData> reqData = request.groupTopicPartitionData();
List<TopicData<PartitionAllData>> resultArgs = new ArrayList<>();
// we will fetch topic and partition info from the request and
// return valid but default response (keep partition id and topic from request but initialize other
// values as default).
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionAllData(
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList()))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
}
@Override
public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) {
GroupTopicPartitionData<PartitionStateBatchData> reqData = request.groupTopicPartitionData();
List<TopicData<PartitionErrorData>> resultArgs = new ArrayList<>();
for (TopicData<PartitionStateBatchData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream()
.map(batch -> PartitionFactory.newPartitionErrorData(batch.partition(), PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new WriteShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
}
@Override
public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGroupStateParameters request) {
GroupTopicPartitionData<PartitionIdData> reqData = request.groupTopicPartitionData();
List<TopicData<PartitionErrorData>> resultArgs = new ArrayList<>();
for (TopicData<PartitionIdData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream()
.map(batch -> PartitionFactory.newPartitionErrorData(batch.partition(), PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
}
@Override
public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) {
GroupTopicPartitionData<PartitionIdLeaderEpochData> reqData = request.groupTopicPartitionData();
List<TopicData<PartitionStateErrorData>> resultArgs = new ArrayList<>();
// we will fetch topic and partition info from the request and
// return valid but default response (keep partition id and topic from request but initialize other
// values as default).
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionStateErrorData(
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
}
@Override
public void stop() {
//noop
}
}