diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java
index 353d66e1fd8..891e37f8af7 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -45,6 +45,10 @@ import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
+import org.apache.kafka.server.share.fetch.DeliveryCountOps;
+import org.apache.kafka.server.share.fetch.InFlightBatch;
+import org.apache.kafka.server.share.fetch.InFlightState;
+import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
@@ -61,7 +65,6 @@ import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.Timer;
-import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.slf4j.Logger;
@@ -134,80 +137,6 @@ public class SharePartition {
FENCED
}
- /**
- * The RecordState is used to track the state of a record that has been fetched from the leader.
- * The state of the records determines if the records should be re-delivered, move the next fetch
- * offset, or be state persisted to disk.
- */
- // Visible for testing
- enum RecordState {
- AVAILABLE((byte) 0),
- ACQUIRED((byte) 1),
- ACKNOWLEDGED((byte) 2),
- ARCHIVED((byte) 4);
-
- public final byte id;
-
- RecordState(byte id) {
- this.id = id;
- }
-
- /**
- * Validates that the newState
is one of the valid transition from the current
- * {@code RecordState}.
- *
- * @param newState State into which requesting to transition; must be non-null
- *
- * @return {@code RecordState} newState
if validation succeeds. Returning
- * newState
helps state assignment chaining.
- *
- * @throws IllegalStateException if the state transition validation fails.
- */
- public RecordState validateTransition(RecordState newState) throws IllegalStateException {
- Objects.requireNonNull(newState, "newState cannot be null");
- if (this == newState) {
- throw new IllegalStateException("The state transition is invalid as the new state is"
- + "the same as the current state");
- }
-
- if (this == ACKNOWLEDGED || this == ARCHIVED) {
- throw new IllegalStateException("The state transition is invalid from the current state: " + this);
- }
-
- if (this == AVAILABLE && newState != ACQUIRED) {
- throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE");
- }
-
- // Either the transition is from Available -> Acquired or from Acquired -> Available/
- // Acknowledged/Archived.
- return newState;
- }
-
- public static RecordState forId(byte id) {
- return switch (id) {
- case 0 -> AVAILABLE;
- case 1 -> ACQUIRED;
- case 2 -> ACKNOWLEDGED;
- case 4 -> ARCHIVED;
- default -> throw new IllegalArgumentException("Unknown record state id: " + id);
- };
- }
-
- public byte id() {
- return this.id;
- }
- }
-
- /**
- * The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease,
- * or do nothing.
- */
- private enum DeliveryCountOps {
- INCREASE,
- DECREASE,
- NO_OP
- }
-
/**
* The group id of the share partition belongs to.
*/
@@ -307,6 +236,11 @@ public class SharePartition {
*/
private final SharePartitionMetrics sharePartitionMetrics;
+ /**
+ * The acquisition lock timeout handler is used to handle the acquisition lock timeout for the share partition.
+ */
+ private final AcquisitionLockTimeoutHandler timeoutHandler;
+
/**
* The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition.
@@ -425,6 +359,7 @@ public class SharePartition {
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition);
this.listener = listener;
this.sharePartitionMetrics = sharePartitionMetrics;
+ this.timeoutHandler = releaseAcquisitionLockOnTimeout();
this.registerGaugeMetrics();
}
@@ -528,8 +463,9 @@ public class SharePartition {
gapStartOffset = previousBatchLastOffset + 1;
}
previousBatchLastOffset = stateBatch.lastOffset();
- InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
- stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null);
+ InFlightBatch inFlightBatch = new InFlightBatch(timer, time, EMPTY_MEMBER_ID, stateBatch.firstOffset(),
+ stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
+ null, timeoutHandler, sharePartitionMetrics);
cachedState.put(stateBatch.firstOffset(), inFlightBatch);
sharePartitionMetrics.recordInFlightBatchMessageCount(stateBatch.lastOffset() - stateBatch.firstOffset() + 1);
}
@@ -649,7 +585,7 @@ public class SharePartition {
} else {
// The offset state is maintained hence find the next available offset.
for (Map.Entry offsetState : entry.getValue().offsetState().entrySet()) {
- if (offsetState.getValue().state == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) {
+ if (offsetState.getValue().state() == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) {
nextFetchOffset = offsetState.getKey();
break;
}
@@ -1034,7 +970,7 @@ public class SharePartition {
log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", inFlightBatch,
groupId, topicIdPartition);
- for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
+ for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) {
// Check if member id is the owner of the offset.
if (!offsetState.getValue().memberId().equals(memberId) && !offsetState.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
@@ -1042,7 +978,7 @@ public class SharePartition {
+ " partition: {}-{}. Skipping offset.", memberId, offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
return Optional.empty();
}
- if (offsetState.getValue().state == RecordState.ACQUIRED) {
+ if (offsetState.getValue().state() == RecordState.ACQUIRED) {
// These records were fetched but they were not actually delivered to the client.
InFlightState updateResult = offsetState.getValue().startStateTransition(
offsetState.getKey() < startOffset ? RecordState.ARCHIVED : recordState,
@@ -1060,11 +996,11 @@ public class SharePartition {
// Successfully updated the state of the offset.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
- updateResult.state.id, (short) updateResult.deliveryCount));
+ updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
- if (updateResult.state != RecordState.ARCHIVED) {
+ if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@@ -1105,11 +1041,11 @@ public class SharePartition {
// Successfully updated the state of the batch.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
- updateResult.state.id, (short) updateResult.deliveryCount));
+ updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
- if (updateResult.state != RecordState.ARCHIVED) {
+ if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@@ -1293,7 +1229,7 @@ public class SharePartition {
// No further offsets to process.
break;
}
- if (offsetState.getValue().state != initialState) {
+ if (offsetState.getValue().state() != initialState) {
continue;
}
@@ -1317,7 +1253,7 @@ public class SharePartition {
// Change the state of complete batch since the same state exists for the entire inFlight batch.
inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
if (initialState == RecordState.ACQUIRED) {
- inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
+ inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
}
return true;
}
@@ -1610,12 +1546,16 @@ public class SharePartition {
AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), acquiredRecords.lastOffset());
// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch.
cachedState.put(acquiredRecords.firstOffset(), new InFlightBatch(
+ timer,
+ time,
memberId,
acquiredRecords.firstOffset(),
acquiredRecords.lastOffset(),
RecordState.ACQUIRED,
1,
- timerTask));
+ timerTask,
+ timeoutHandler,
+ sharePartitionMetrics));
// Update the in-flight batch message count metrics for the share partition.
sharePartitionMetrics.recordInFlightBatchMessageCount(acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
});
@@ -1635,7 +1575,7 @@ public class SharePartition {
lock.writeLock().lock();
int acquiredCount = 0;
try {
- for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
+ for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the request base
// offset i.e. cached batch of 10-14 offsets and request batch of 12-13.
if (offsetState.getKey() < requestFirstOffset) {
@@ -1647,7 +1587,7 @@ public class SharePartition {
break;
}
- if (offsetState.getValue().state != RecordState.AVAILABLE || offsetState.getValue().hasOngoingStateTransition()) {
+ if (offsetState.getValue().state() != RecordState.AVAILABLE || offsetState.getValue().hasOngoingStateTransition()) {
log.trace("The offset {} is not available in share partition: {}-{}, skipping: {}",
offsetState.getKey(), groupId, topicIdPartition, inFlightBatch);
continue;
@@ -1670,7 +1610,7 @@ public class SharePartition {
result.add(new AcquiredRecords()
.setFirstOffset(offsetState.getKey())
.setLastOffset(offsetState.getKey())
- .setDeliveryCount((short) offsetState.getValue().deliveryCount));
+ .setDeliveryCount((short) offsetState.getValue().deliveryCount()));
acquiredCount++;
}
} finally {
@@ -1758,7 +1698,7 @@ public class SharePartition {
NavigableMap subMap = cachedState.subMap(floorOffset.getKey(), true, batch.lastOffset(), true);
// Validate if the request batch has the first offset greater than the last offset of the last
// fetched cached batch, then there will be no offsets in the request that can be acknowledged.
- if (subMap.lastEntry().getValue().lastOffset < batch.firstOffset()) {
+ if (subMap.lastEntry().getValue().lastOffset() < batch.firstOffset()) {
log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition);
throw new InvalidRequestException("Batch record not found. The first offset in request is past acquired records.");
}
@@ -1766,7 +1706,7 @@ public class SharePartition {
// Validate if the request batch has the last offset greater than the last offset of
// the last fetched cached batch, then there will be offsets in the request than cannot
// be found in the fetched batches.
- if (batch.lastOffset() > subMap.lastEntry().getValue().lastOffset) {
+ if (batch.lastOffset() > subMap.lastEntry().getValue().lastOffset()) {
log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition);
throw new InvalidRequestException("Batch record not found. The last offset in request is past acquired records.");
}
@@ -1888,7 +1828,7 @@ public class SharePartition {
// Fetch the first record state from the map to be used as default record state in case the
// offset record state is not provided by client.
RecordState recordStateDefault = recordStateMap.get(batch.firstOffset());
- for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
+ for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) {
// 1. For the first batch which might have offsets prior to the request base
// offset i.e. cached batch of 10-14 offsets and request batch of 12-13.
@@ -1902,7 +1842,7 @@ public class SharePartition {
break;
}
- if (offsetState.getValue().state != RecordState.ACQUIRED) {
+ if (offsetState.getValue().state() != RecordState.ACQUIRED) {
log.debug("The offset is not acquired, offset: {} batch: {} for the share"
+ " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId,
topicIdPartition);
@@ -1911,7 +1851,7 @@ public class SharePartition {
}
// Check if member id is the owner of the offset.
- if (!offsetState.getValue().memberId.equals(memberId)) {
+ if (!offsetState.getValue().memberId().equals(memberId)) {
log.debug("Member {} is not the owner of offset: {} in batch: {} for the share"
+ " partition: {}-{}", memberId, offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition);
@@ -1940,11 +1880,11 @@ public class SharePartition {
// Successfully updated the state of the offset.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
- updateResult.state.id, (short) updateResult.deliveryCount));
+ updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
- && updateResult.state != RecordState.ARCHIVED) {
+ && updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@@ -1994,13 +1934,13 @@ public class SharePartition {
// Successfully updated the state of the batch.
updatedStates.add(updateResult);
stateBatches.add(
- new PersisterStateBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset,
- updateResult.state.id, (short) updateResult.deliveryCount));
+ new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
+ updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the nextFetchOffset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
- && updateResult.state != RecordState.ARCHIVED) {
+ && updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
} finally {
@@ -2086,7 +2026,7 @@ public class SharePartition {
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
- if (state.state == RecordState.AVAILABLE) {
+ if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
@@ -2253,7 +2193,7 @@ public class SharePartition {
}
lastOffsetAcknowledged = inFlightBatch.lastOffset();
} else {
- for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
+ for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) {
if (offsetState.getValue().hasOngoingStateTransition() || !isRecordStateAcknowledged(offsetState.getValue().state())) {
return lastOffsetAcknowledged;
}
@@ -2465,11 +2405,11 @@ public class SharePartition {
return;
}
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
- updateResult.state.id, (short) updateResult.deliveryCount));
+ updateResult.state().id(), (short) updateResult.deliveryCount()));
// Cancel the acquisition lock timeout task for the batch since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
- if (updateResult.state != RecordState.ARCHIVED) {
+ if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
return;
@@ -2494,7 +2434,7 @@ public class SharePartition {
// No further offsets to process.
break;
}
- if (offsetState.getValue().state != RecordState.ACQUIRED) {
+ if (offsetState.getValue().state() != RecordState.ACQUIRED) {
log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {}"
+ " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition, memberId);
@@ -2512,11 +2452,11 @@ public class SharePartition {
continue;
}
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
- updateResult.state.id, (short) updateResult.deliveryCount));
+ updateResult.state().id(), (short) updateResult.deliveryCount()));
// Cancel the acquisition lock timeout task for the offset since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
- if (updateResult.state != RecordState.ARCHIVED) {
+ if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@@ -2838,300 +2778,6 @@ public class SharePartition {
}
}
- /**
- * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
- */
- final class InFlightBatch {
- // The offset of the first record in the batch that is fetched from the log.
- private final long firstOffset;
- // The last offset of the batch that is fetched from the log.
- private final long lastOffset;
-
- // The batch state of the fetched records. If the offset state map is empty then batchState
- // determines the state of the complete batch else individual offset determines the state of
- // the respective records.
- private InFlightState batchState;
-
- // The offset state map is used to track the state of the records per offset. However, the
- // offset state map is only required when the state of the offsets within same batch are
- // different. The states can be different when explicit offset acknowledgment is done which
- // is different from the batch state.
- private NavigableMap offsetState;
-
- InFlightBatch(String memberId, long firstOffset, long lastOffset, RecordState state,
- int deliveryCount, AcquisitionLockTimerTask acquisitionLockTimeoutTask
- ) {
- this.firstOffset = firstOffset;
- this.lastOffset = lastOffset;
- this.batchState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
- }
-
- // Visible for testing.
- long firstOffset() {
- return firstOffset;
- }
-
- // Visible for testing.
- long lastOffset() {
- return lastOffset;
- }
-
- // Visible for testing.
- RecordState batchState() {
- return inFlightState().state;
- }
-
- // Visible for testing.
- String batchMemberId() {
- if (batchState == null) {
- throw new IllegalStateException("The batch member id is not available as the offset state is maintained");
- }
- return batchState.memberId;
- }
-
- // Visible for testing.
- int batchDeliveryCount() {
- if (batchState == null) {
- throw new IllegalStateException("The batch delivery count is not available as the offset state is maintained");
- }
- return batchState.deliveryCount;
- }
-
- // Visible for testing.
- AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
- return inFlightState().acquisitionLockTimeoutTask;
- }
-
- // Visible for testing.
- NavigableMap offsetState() {
- return offsetState;
- }
-
- private InFlightState inFlightState() {
- if (batchState == null) {
- throw new IllegalStateException("The batch state is not available as the offset state is maintained");
- }
- return batchState;
- }
-
- // Visible for testing.
- boolean batchHasOngoingStateTransition() {
- return inFlightState().hasOngoingStateTransition();
- }
-
- private void archiveBatch(String newMemberId) {
- inFlightState().archive(newMemberId);
- }
-
- private InFlightState tryUpdateBatchState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
- if (batchState == null) {
- throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
- }
- return batchState.tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
- }
-
- private InFlightState startBatchStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount,
- String newMemberId) {
- if (batchState == null) {
- throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
- }
- return batchState.startStateTransition(newState, ops, maxDeliveryCount, newMemberId);
- }
-
- private void maybeInitializeOffsetStateUpdate() {
- if (offsetState == null) {
- offsetState = new ConcurrentSkipListMap<>();
- // The offset state map is not initialized hence initialize the state of the offsets
- // from the first offset to the last offset. Mark the batch inflightState to null as
- // the state of the records is maintained in the offset state map now.
- for (long offset = this.firstOffset; offset <= this.lastOffset; offset++) {
- if (batchState.acquisitionLockTimeoutTask != null) {
- // The acquisition lock timeout task is already scheduled for the batch, hence we need to schedule
- // the acquisition lock timeout task for the offset as well.
- long delayMs = batchState.acquisitionLockTimeoutTask.expirationMs() - time.hiResClockMs();
- AcquisitionLockTimerTask timerTask = acquisitionLockTimerTask(batchState.memberId, offset, offset, delayMs);
- offsetState.put(offset, new InFlightState(batchState.state, batchState.deliveryCount, batchState.memberId, timerTask));
- timer.add(timerTask);
- } else {
- offsetState.put(offset, new InFlightState(batchState.state, batchState.deliveryCount, batchState.memberId));
- }
- }
- // Cancel the acquisition lock timeout task for the batch as the offset state is maintained.
- if (batchState.acquisitionLockTimeoutTask != null) {
- batchState.cancelAndClearAcquisitionLockTimeoutTask();
- }
- batchState = null;
- }
- }
-
- private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
- inFlightState().acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
- }
-
- @Override
- public String toString() {
- return "InFlightBatch(" +
- "firstOffset=" + firstOffset +
- ", lastOffset=" + lastOffset +
- ", inFlightState=" + batchState +
- ", offsetState=" + ((offsetState == null) ? "null" : offsetState) +
- ")";
- }
- }
-
- /**
- * The InFlightState is used to track the state and delivery count of a record that has been
- * fetched from the leader. The state of the record is used to determine if the record should
- * be re-deliver or if it can be acknowledged or archived.
- */
- static final class InFlightState {
-
- // The state of the fetch batch records.
- private RecordState state;
- // The number of times the records has been delivered to the client.
- private int deliveryCount;
- // The member id of the client that is fetching/acknowledging the record.
- private String memberId;
- // The state of the records before the transition. In case we need to revert an in-flight state, we revert the above
- // attributes of InFlightState to this state, namely - state, deliveryCount and memberId.
- private InFlightState rollbackState;
- // The timer task for the acquisition lock timeout.
- private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
-
-
- InFlightState(RecordState state, int deliveryCount, String memberId) {
- this(state, deliveryCount, memberId, null);
- }
-
- InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
- this.state = state;
- this.deliveryCount = deliveryCount;
- this.memberId = memberId;
- this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
- }
-
- // Visible for testing.
- RecordState state() {
- return state;
- }
-
- String memberId() {
- return memberId;
- }
-
- // Visible for testing.
- TimerTask acquisitionLockTimeoutTask() {
- return acquisitionLockTimeoutTask;
- }
-
- void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) throws IllegalArgumentException {
- if (this.acquisitionLockTimeoutTask != null) {
- throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
- }
- this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
- }
-
- void cancelAndClearAcquisitionLockTimeoutTask() {
- acquisitionLockTimeoutTask.cancel();
- acquisitionLockTimeoutTask = null;
- }
-
- // Visible for testing.
- boolean hasOngoingStateTransition() {
- if (rollbackState == null) {
- // This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
- // been committed.
- return false;
- }
- return rollbackState.state != null;
- }
-
- /**
- * Try to update the state of the records. The state of the records can only be updated if the
- * new state is allowed to be transitioned from old state. The delivery count is not changed
- * if the state update is unsuccessful.
- *
- * @param newState The new state of the records.
- * @param ops The behavior on the delivery count.
- *
- * @return {@code InFlightState} if update succeeds, null otherwise. Returning state
- * helps update chaining.
- */
- private InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
- try {
- if (newState == RecordState.AVAILABLE && ops != DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
- newState = RecordState.ARCHIVED;
- }
- state = state.validateTransition(newState);
- if (newState != RecordState.ARCHIVED) {
- deliveryCount = updatedDeliveryCount(ops);
- }
- memberId = newMemberId;
- return this;
- } catch (IllegalStateException e) {
- log.error("Failed to update state of the records", e);
- rollbackState = null;
- return null;
- }
- }
-
- private int updatedDeliveryCount(DeliveryCountOps ops) {
- return switch (ops) {
- case INCREASE -> deliveryCount + 1;
- case DECREASE -> deliveryCount - 1;
- // do nothing
- case NO_OP -> deliveryCount;
- };
- }
-
- private void archive(String newMemberId) {
- state = RecordState.ARCHIVED;
- memberId = newMemberId;
- }
-
- private InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
- rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
- return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
- }
-
- private void completeStateTransition(boolean commit) {
- if (commit) {
- rollbackState = null;
- return;
- }
- state = rollbackState.state;
- deliveryCount = rollbackState.deliveryCount;
- memberId = rollbackState.memberId;
- rollbackState = null;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(state, deliveryCount, memberId);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- InFlightState that = (InFlightState) o;
- return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId);
- }
-
- @Override
- public String toString() {
- return "InFlightState(" +
- "state=" + state.toString() +
- ", deliveryCount=" + deliveryCount +
- ", memberId=" + memberId +
- ")";
- }
- }
-
/**
* FetchOffsetMetadata class is used to cache offset and its log metadata.
*/
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 5059b4c892e..a0bdfee7b5f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -17,8 +17,6 @@
package kafka.server.share;
import kafka.server.ReplicaManager;
-import kafka.server.share.SharePartition.InFlightState;
-import kafka.server.share.SharePartition.RecordState;
import kafka.server.share.SharePartition.SharePartitionState;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
@@ -57,6 +55,8 @@ import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.InFlightState;
+import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.NoOpStatePersister;
@@ -140,45 +140,6 @@ public class SharePartitionTest {
sharePartitionMetrics.close();
}
- @Test
- public void testRecordStateValidateTransition() {
- // Null check.
- assertThrows(NullPointerException.class, () -> RecordState.AVAILABLE.validateTransition(null));
- // Same state transition check.
- assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.AVAILABLE));
- assertThrows(IllegalStateException.class, () -> RecordState.ACQUIRED.validateTransition(RecordState.ACQUIRED));
- assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACKNOWLEDGED));
- assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
- // Invalid state transition to any other state from Acknowledged state.
- assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.AVAILABLE));
- assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACQUIRED));
- assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ARCHIVED));
- // Invalid state transition to any other state from Archived state.
- assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.AVAILABLE));
- assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ACKNOWLEDGED));
- assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
- // Invalid state transition to any other state from Available state other than Acquired.
- assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ACKNOWLEDGED));
- assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ARCHIVED));
-
- // Successful transition from Available to Acquired.
- assertEquals(RecordState.ACQUIRED, RecordState.AVAILABLE.validateTransition(RecordState.ACQUIRED));
- // Successful transition from Acquired to any state.
- assertEquals(RecordState.AVAILABLE, RecordState.ACQUIRED.validateTransition(RecordState.AVAILABLE));
- assertEquals(RecordState.ACKNOWLEDGED, RecordState.ACQUIRED.validateTransition(RecordState.ACKNOWLEDGED));
- assertEquals(RecordState.ARCHIVED, RecordState.ACQUIRED.validateTransition(RecordState.ARCHIVED));
- }
-
- @Test
- public void testRecordStateForId() {
- assertEquals(RecordState.AVAILABLE, RecordState.forId((byte) 0));
- assertEquals(RecordState.ACQUIRED, RecordState.forId((byte) 1));
- assertEquals(RecordState.ACKNOWLEDGED, RecordState.forId((byte) 2));
- assertEquals(RecordState.ARCHIVED, RecordState.forId((byte) 4));
- // Invalid check.
- assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5));
- }
-
@Test
public void testMaybeInitialize() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/DeliveryCountOps.java b/server/src/main/java/org/apache/kafka/server/share/fetch/DeliveryCountOps.java
new file mode 100644
index 00000000000..b69927ae0d4
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/DeliveryCountOps.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+/**
+ * The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease,
+ * or do nothing.
+ */
+public enum DeliveryCountOps {
+ INCREASE, DECREASE, NO_OP
+}
diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
new file mode 100644
index 00000000000..c6a16c5056e
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
+import org.apache.kafka.server.util.timer.Timer;
+
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
+ */
+public class InFlightBatch {
+ // The timer is used to schedule the acquisition lock timeout task for the batch.
+ private final Timer timer;
+ // The time is used to get the current time in milliseconds.
+ private final Time time;
+ // The offset of the first record in the batch that is fetched from the log.
+ private final long firstOffset;
+ // The last offset of the batch that is fetched from the log.
+ private final long lastOffset;
+ // The acquisition lock timeout handler is used to release the acquired records when the acquisition
+ // lock timeout is reached.
+ private final AcquisitionLockTimeoutHandler timeoutHandler;
+ // The share partition metrics are used to track the metrics related to the share partition.
+ private final SharePartitionMetrics sharePartitionMetrics;
+
+ // The batch state of the fetched records. If the offset state map is empty then batchState
+ // determines the state of the complete batch else individual offset determines the state of
+ // the respective records.
+ private InFlightState batchState;
+
+ // The offset state map is used to track the state of the records per offset. However, the
+ // offset state map is only required when the state of the offsets within same batch are
+ // different. The states can be different when explicit offset acknowledgment is done which
+ // is different from the batch state.
+ private NavigableMap offsetState;
+
+ public InFlightBatch(
+ Timer timer,
+ Time time,
+ String memberId,
+ long firstOffset,
+ long lastOffset,
+ RecordState state,
+ int deliveryCount,
+ AcquisitionLockTimerTask acquisitionLockTimeoutTask,
+ AcquisitionLockTimeoutHandler timeoutHandler,
+ SharePartitionMetrics sharePartitionMetrics
+ ) {
+ this.timer = timer;
+ this.time = time;
+ this.firstOffset = firstOffset;
+ this.lastOffset = lastOffset;
+ this.timeoutHandler = timeoutHandler;
+ this.sharePartitionMetrics = sharePartitionMetrics;
+ this.batchState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
+ }
+
+ /**
+ * @return the first offset of the batch.
+ */
+ public long firstOffset() {
+ return firstOffset;
+ }
+
+ /**
+ * @return the last offset of the batch.
+ */
+ public long lastOffset() {
+ return lastOffset;
+ }
+
+ /**
+ * @return the state of the batch.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public RecordState batchState() {
+ return inFlightState().state();
+ }
+
+ /**
+ * @return the member id of the batch.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public String batchMemberId() {
+ return inFlightState().memberId();
+ }
+
+ /**
+ * @return the delivery count of the batch.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public int batchDeliveryCount() {
+ return inFlightState().deliveryCount();
+ }
+
+ /**
+ * @return the acquisition lock timeout task for the batch.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
+ return inFlightState().acquisitionLockTimeoutTask();
+ }
+
+ /**
+ * @return the offset state map which maintains the state of the records per offset.
+ */
+ public NavigableMap offsetState() {
+ return offsetState;
+ }
+
+ /**
+ * Cancel the acquisition lock timeout task and clear the reference to it.
+ * This method is used to cancel the acquisition lock timeout task if it exists
+ * and clear the reference to it.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public void cancelAndClearAcquisitionLockTimeoutTask() {
+ inFlightState().cancelAndClearAcquisitionLockTimeoutTask();
+ }
+
+ /**
+ * @return true if the batch has an ongoing state transition, false otherwise.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public boolean batchHasOngoingStateTransition() {
+ return inFlightState().hasOngoingStateTransition();
+ }
+
+ /**
+ * Archive the batch state. This is used to mark the batch as archived and no further updates
+ * are allowed to the batch state.
+ * @param newMemberId The new member id for the records.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public void archiveBatch(String newMemberId) {
+ inFlightState().archive(newMemberId);
+ }
+
+ /**
+ * Try to update the batch state. The state of the batch can only be updated if the new state is allowed
+ * to be transitioned from old state. The delivery count is not changed if the state update is unsuccessful.
+ *
+ * @param newState The new state of the records.
+ * @param ops The behavior on the delivery count.
+ * @param maxDeliveryCount The maximum delivery count for the records.
+ * @param newMemberId The new member id for the records.
+ * @return {@code InFlightState} if update succeeds, null otherwise. Returning state helps update chaining.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public InFlightState tryUpdateBatchState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+ return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
+ }
+
+ /**
+ * Start a state transition for the batch. This is used to mark the batch as in-flight and
+ * no further updates are allowed to the batch state.
+ *
+ * @param newState The new state of the records.
+ * @param ops The behavior on the delivery count.
+ * @param maxDeliveryCount The maximum delivery count for the records.
+ * @param newMemberId The new member id for the records.
+ * @return {@code InFlightState} if update succeeds, null otherwise. Returning state helps update chaining.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public InFlightState startBatchStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount,
+ String newMemberId
+ ) {
+ return inFlightState().startStateTransition(newState, ops, maxDeliveryCount, newMemberId);
+ }
+
+ /**
+ * Initialize the offset state map if it is not already initialized. This is used to maintain the state of the
+ * records per offset when the state of the offsets within same batch are different.
+ */
+ public void maybeInitializeOffsetStateUpdate() {
+ if (offsetState == null) {
+ offsetState = new ConcurrentSkipListMap<>();
+ // The offset state map is not initialized hence initialize the state of the offsets
+ // from the first offset to the last offset. Mark the batch inflightState to null as
+ // the state of the records is maintained in the offset state map now.
+ for (long offset = this.firstOffset; offset <= this.lastOffset; offset++) {
+ if (batchState.acquisitionLockTimeoutTask() != null) {
+ // The acquisition lock timeout task is already scheduled for the batch, hence we need to schedule
+ // the acquisition lock timeout task for the offset as well.
+ long delayMs = batchState.acquisitionLockTimeoutTask().expirationMs() - time.hiResClockMs();
+ AcquisitionLockTimerTask timerTask = acquisitionLockTimerTask(batchState.memberId(), offset, offset, delayMs);
+ offsetState.put(offset, new InFlightState(batchState.state(), batchState.deliveryCount(), batchState.memberId(), timerTask));
+ timer.add(timerTask);
+ } else {
+ offsetState.put(offset, new InFlightState(batchState.state(), batchState.deliveryCount(), batchState.memberId()));
+ }
+ }
+ // Cancel the acquisition lock timeout task for the batch as the offset state is maintained.
+ if (batchState.acquisitionLockTimeoutTask() != null) {
+ batchState.cancelAndClearAcquisitionLockTimeoutTask();
+ }
+ batchState = null;
+ }
+ }
+
+ /**
+ * Update the acquisition lock timeout task for the batch. This is used to update the acquisition lock timeout
+ * task for the batch when the acquisition lock timeout is changed.
+ *
+ * @param acquisitionLockTimeoutTask The new acquisition lock timeout task for the batch.
+ * @throws IllegalStateException if the offset state is maintained and the batch state is not available.
+ */
+ public void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
+ inFlightState().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+ }
+
+ private InFlightState inFlightState() {
+ if (batchState == null) {
+ throw new IllegalStateException("The batch state is not available as the offset state is maintained");
+ }
+ return batchState;
+ }
+
+ private AcquisitionLockTimerTask acquisitionLockTimerTask(
+ String memberId,
+ long firstOffset,
+ long lastOffset,
+ long delayMs
+ ) {
+ return new AcquisitionLockTimerTask(time, delayMs, memberId, firstOffset, lastOffset, timeoutHandler, sharePartitionMetrics);
+ }
+
+ @Override
+ public String toString() {
+ return "InFlightBatch(" +
+ "firstOffset=" + firstOffset +
+ ", lastOffset=" + lastOffset +
+ ", inFlightState=" + batchState +
+ ", offsetState=" + ((offsetState == null) ? "null" : offsetState) +
+ ")";
+ }
+}
diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
new file mode 100644
index 00000000000..97b2d55ed0b
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * The InFlightState is used to track the state and delivery count of a record that has been
+ * fetched from the leader. The state of the record is used to determine if the record should
+ * be re-deliver or if it can be acknowledged or archived.
+ */
+public class InFlightState {
+
+ private static final Logger log = LoggerFactory.getLogger(InFlightState.class);
+
+ // The state of the fetch batch records.
+ private RecordState state;
+ // The number of times the records has been delivered to the client.
+ private int deliveryCount;
+ // The member id of the client that is fetching/acknowledging the record.
+ private String memberId;
+ // The state of the records before the transition. In case we need to revert an in-flight state, we revert the above
+ // attributes of InFlightState to this state, namely - state, deliveryCount and memberId.
+ private InFlightState rollbackState;
+ // The timer task for the acquisition lock timeout.
+ private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+
+ // Visible for testing.
+ public InFlightState(RecordState state, int deliveryCount, String memberId) {
+ this(state, deliveryCount, memberId, null);
+ }
+
+ InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
+ this.state = state;
+ this.deliveryCount = deliveryCount;
+ this.memberId = memberId;
+ this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
+ }
+
+ /**
+ * @return The current state of the record.
+ */
+ public RecordState state() {
+ return state;
+ }
+
+ /**
+ * @return The number of times the record has been delivered.
+ */
+ public int deliveryCount() {
+ return deliveryCount;
+ }
+
+ /**
+ * @return The member id of the client that is fetching/acknowledging the record.
+ */
+ public String memberId() {
+ return memberId;
+ }
+
+ /**
+ * @return The timer task for the acquisition lock timeout.
+ */
+ public AcquisitionLockTimerTask acquisitionLockTimeoutTask() {
+ return acquisitionLockTimeoutTask;
+ }
+
+ /**
+ * Update the acquisition lock timeout task. This method is used to set the acquisition lock
+ * timeout task for the record. If there is already an acquisition lock timeout task set,
+ * it throws an IllegalArgumentException.
+ *
+ * @param acquisitionLockTimeoutTask The new acquisition lock timeout task to set.
+ * @throws IllegalArgumentException if there is already an acquisition lock timeout task set.
+ */
+ public void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) throws IllegalArgumentException {
+ if (this.acquisitionLockTimeoutTask != null) {
+ throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
+ }
+ this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
+ }
+
+ /**
+ * Cancel the acquisition lock timeout task and clear the reference to it.
+ * This method is used to cancel the acquisition lock timeout task if it exists
+ * and clear the reference to it.
+ */
+ public void cancelAndClearAcquisitionLockTimeoutTask() {
+ acquisitionLockTimeoutTask.cancel();
+ acquisitionLockTimeoutTask = null;
+ }
+
+ /**
+ * Check if there is an ongoing state transition for the records.
+ * This method checks if the rollbackState is not null, which indicates that
+ * there has been a state transition that has not been committed yet.
+ *
+ * @return true if there is an ongoing state transition, false otherwise.
+ */
+ public boolean hasOngoingStateTransition() {
+ if (rollbackState == null) {
+ // This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
+ // been committed.
+ return false;
+ }
+ return rollbackState.state != null;
+ }
+
+ /**
+ * Try to update the state of the records. The state of the records can only be updated if the
+ * new state is allowed to be transitioned from old state. The delivery count is not changed
+ * if the state update is unsuccessful.
+ *
+ * @param newState The new state of the records.
+ * @param ops The behavior on the delivery count.
+ * @param maxDeliveryCount The maximum delivery count for the record.
+ * @param newMemberId The member id of the client that is fetching/acknowledging the record.
+ *
+ * @return {@code InFlightState} if update succeeds, null otherwise. Returning state
+ * helps update chaining.
+ */
+ public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+ try {
+ if (newState == RecordState.AVAILABLE && ops != DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
+ newState = RecordState.ARCHIVED;
+ }
+ state = state.validateTransition(newState);
+ if (newState != RecordState.ARCHIVED) {
+ deliveryCount = updatedDeliveryCount(ops);
+ }
+ memberId = newMemberId;
+ return this;
+ } catch (IllegalStateException e) {
+ log.error("Failed to update state of the records", e);
+ rollbackState = null;
+ return null;
+ }
+ }
+
+ /**
+ * Archive the record by setting its state to ARCHIVED, clearing the memberId and
+ * cancelling the acquisition lock timeout task.
+ * This method is used to archive the record when it is no longer needed.
+ */
+ public void archive(String newMemberId) {
+ state = RecordState.ARCHIVED;
+ memberId = newMemberId;
+ }
+
+ /**
+ * Start a state transition for the records. This method is used to start a state transition
+ * for the records. It creates a copy of the current state and sets it as the rollback state.
+ * If the state transition is successful, it returns the updated state.
+ *
+ * @param newState The new state of the records.
+ * @param ops The behavior on the delivery count.
+ * @param maxDeliveryCount The maximum delivery count for the record.
+ * @param newMemberId The member id of the client that is fetching/acknowledging the record.
+ *
+ * @return {@code InFlightState} if update succeeds, null otherwise. Returning state
+ * helps update chaining.
+ */
+ public InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+ rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
+ return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
+ }
+
+ /**
+ * Complete the state transition for the records. If the commit is true or the state is terminal,
+ * it cancels the acquisition lock timeout task and clears the rollback state.
+ * If the commit is false and the state is not terminal, it rolls back the state transition.
+ *
+ * @param commit If true, commits the state transition, otherwise rolls back.
+ */
+ public void completeStateTransition(boolean commit) {
+ if (commit) {
+ rollbackState = null;
+ return;
+ }
+ state = rollbackState.state;
+ deliveryCount = rollbackState.deliveryCount;
+ memberId = rollbackState.memberId;
+ rollbackState = null;
+ }
+
+ private int updatedDeliveryCount(DeliveryCountOps ops) {
+ return switch (ops) {
+ case INCREASE -> deliveryCount + 1;
+ case DECREASE -> deliveryCount - 1;
+ // do nothing
+ case NO_OP -> deliveryCount;
+ };
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(state, deliveryCount, memberId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InFlightState that = (InFlightState) o;
+ return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId);
+ }
+
+ @Override
+ public String toString() {
+ return "InFlightState(" +
+ "state=" + state.toString() +
+ ", deliveryCount=" + deliveryCount +
+ ", memberId=" + memberId +
+ ")";
+ }
+}
diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java b/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
new file mode 100644
index 00000000000..2870f4ad8e1
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import java.util.Objects;
+
+/**
+ * The RecordState is used to track the state of a record that has been fetched from the leader.
+ * The state of the records determines if the records should be re-delivered, move the next fetch
+ * offset, or be state persisted to disk.
+ */
+public enum RecordState {
+ AVAILABLE((byte) 0),
+ ACQUIRED((byte) 1),
+ ACKNOWLEDGED((byte) 2),
+ ARCHIVED((byte) 4);
+
+ public final byte id;
+
+ RecordState(byte id) {
+ this.id = id;
+ }
+
+ /**
+ * Validates that the newState
is one of the valid transition from the current
+ * {@code RecordState}.
+ *
+ * @param newState State into which requesting to transition; must be non-null
+ *
+ * @return {@code RecordState} newState
if validation succeeds. Returning
+ * newState
helps state assignment chaining.
+ *
+ * @throws IllegalStateException if the state transition validation fails.
+ */
+ public RecordState validateTransition(RecordState newState) throws IllegalStateException {
+ Objects.requireNonNull(newState, "newState cannot be null");
+ if (this == newState) {
+ throw new IllegalStateException("The state transition is invalid as the new state is"
+ + "the same as the current state");
+ }
+
+ if (this == ACKNOWLEDGED || this == ARCHIVED) {
+ throw new IllegalStateException("The state transition is invalid from the current state: " + this);
+ }
+
+ if (this == AVAILABLE && newState != ACQUIRED) {
+ throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE");
+ }
+
+ // Either the transition is from Available -> Acquired or from Acquired -> Available/
+ // Acknowledged/Archived.
+ return newState;
+ }
+
+ public static RecordState forId(byte id) {
+ return switch (id) {
+ case 0 -> AVAILABLE;
+ case 1 -> ACQUIRED;
+ case 2 -> ACKNOWLEDGED;
+ case 4 -> ARCHIVED;
+ default -> throw new IllegalArgumentException("Unknown record state id: " + id);
+ };
+ }
+
+ public byte id() {
+ return this.id;
+ }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/RecordStateTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/RecordStateTest.java
new file mode 100644
index 00000000000..10282f34414
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/share/fetch/RecordStateTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RecordStateTest {
+
+ @Test
+ public void testRecordStateValidateTransition() {
+ // Null check.
+ assertThrows(NullPointerException.class, () -> RecordState.AVAILABLE.validateTransition(null));
+ // Same state transition check.
+ assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.AVAILABLE));
+ assertThrows(IllegalStateException.class, () -> RecordState.ACQUIRED.validateTransition(RecordState.ACQUIRED));
+ assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACKNOWLEDGED));
+ assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
+ // Invalid state transition to any other state from Acknowledged state.
+ assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.AVAILABLE));
+ assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACQUIRED));
+ assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ARCHIVED));
+ // Invalid state transition to any other state from Archived state.
+ assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.AVAILABLE));
+ assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ACKNOWLEDGED));
+ assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
+ // Invalid state transition to any other state from Available state other than Acquired.
+ assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ACKNOWLEDGED));
+ assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ARCHIVED));
+
+ // Successful transition from Available to Acquired.
+ assertEquals(RecordState.ACQUIRED, RecordState.AVAILABLE.validateTransition(RecordState.ACQUIRED));
+ // Successful transition from Acquired to any state.
+ assertEquals(RecordState.AVAILABLE, RecordState.ACQUIRED.validateTransition(RecordState.AVAILABLE));
+ assertEquals(RecordState.ACKNOWLEDGED, RecordState.ACQUIRED.validateTransition(RecordState.ACKNOWLEDGED));
+ assertEquals(RecordState.ARCHIVED, RecordState.ACQUIRED.validateTransition(RecordState.ARCHIVED));
+ }
+
+ @Test
+ public void testRecordStateForId() {
+ assertEquals(RecordState.AVAILABLE, RecordState.forId((byte) 0));
+ assertEquals(RecordState.ACQUIRED, RecordState.forId((byte) 1));
+ assertEquals(RecordState.ACKNOWLEDGED, RecordState.forId((byte) 2));
+ assertEquals(RecordState.ARCHIVED, RecordState.forId((byte) 4));
+ // Invalid check.
+ assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5));
+ }
+}