KAFKA-18265: Move inflight batch and state classes from SharePartition (2/N) (#20230)
CI / build (push) Waiting to run Details

Another refactor PR to move in-flight batch and state out of
SharePartition. This PR concludes the refactoring and subsequent PRs for
this ticket will involve code cleanups and better lock handling. However
the intent is to keep PRs small so they can be reviewed easily.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-07-23 23:01:23 +01:00 committed by GitHub
parent a663ce3f45
commit d350f603a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 712 additions and 442 deletions

View File

@ -45,6 +45,10 @@ import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DeliveryCountOps;
import org.apache.kafka.server.share.fetch.InFlightBatch;
import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
@ -61,7 +65,6 @@ import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.slf4j.Logger;
@ -134,80 +137,6 @@ public class SharePartition {
FENCED
}
/**
* The RecordState is used to track the state of a record that has been fetched from the leader.
* The state of the records determines if the records should be re-delivered, move the next fetch
* offset, or be state persisted to disk.
*/
// Visible for testing
enum RecordState {
AVAILABLE((byte) 0),
ACQUIRED((byte) 1),
ACKNOWLEDGED((byte) 2),
ARCHIVED((byte) 4);
public final byte id;
RecordState(byte id) {
this.id = id;
}
/**
* Validates that the <code>newState</code> is one of the valid transition from the current
* {@code RecordState}.
*
* @param newState State into which requesting to transition; must be non-<code>null</code>
*
* @return {@code RecordState} <code>newState</code> if validation succeeds. Returning
* <code>newState</code> helps state assignment chaining.
*
* @throws IllegalStateException if the state transition validation fails.
*/
public RecordState validateTransition(RecordState newState) throws IllegalStateException {
Objects.requireNonNull(newState, "newState cannot be null");
if (this == newState) {
throw new IllegalStateException("The state transition is invalid as the new state is"
+ "the same as the current state");
}
if (this == ACKNOWLEDGED || this == ARCHIVED) {
throw new IllegalStateException("The state transition is invalid from the current state: " + this);
}
if (this == AVAILABLE && newState != ACQUIRED) {
throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE");
}
// Either the transition is from Available -> Acquired or from Acquired -> Available/
// Acknowledged/Archived.
return newState;
}
public static RecordState forId(byte id) {
return switch (id) {
case 0 -> AVAILABLE;
case 1 -> ACQUIRED;
case 2 -> ACKNOWLEDGED;
case 4 -> ARCHIVED;
default -> throw new IllegalArgumentException("Unknown record state id: " + id);
};
}
public byte id() {
return this.id;
}
}
/**
* The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease,
* or do nothing.
*/
private enum DeliveryCountOps {
INCREASE,
DECREASE,
NO_OP
}
/**
* The group id of the share partition belongs to.
*/
@ -307,6 +236,11 @@ public class SharePartition {
*/
private final SharePartitionMetrics sharePartitionMetrics;
/**
* The acquisition lock timeout handler is used to handle the acquisition lock timeout for the share partition.
*/
private final AcquisitionLockTimeoutHandler timeoutHandler;
/**
* The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition.
@ -425,6 +359,7 @@ public class SharePartition {
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition);
this.listener = listener;
this.sharePartitionMetrics = sharePartitionMetrics;
this.timeoutHandler = releaseAcquisitionLockOnTimeout();
this.registerGaugeMetrics();
}
@ -528,8 +463,9 @@ public class SharePartition {
gapStartOffset = previousBatchLastOffset + 1;
}
previousBatchLastOffset = stateBatch.lastOffset();
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null);
InFlightBatch inFlightBatch = new InFlightBatch(timer, time, EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
null, timeoutHandler, sharePartitionMetrics);
cachedState.put(stateBatch.firstOffset(), inFlightBatch);
sharePartitionMetrics.recordInFlightBatchMessageCount(stateBatch.lastOffset() - stateBatch.firstOffset() + 1);
}
@ -649,7 +585,7 @@ public class SharePartition {
} else {
// The offset state is maintained hence find the next available offset.
for (Map.Entry<Long, InFlightState> offsetState : entry.getValue().offsetState().entrySet()) {
if (offsetState.getValue().state == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) {
if (offsetState.getValue().state() == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) {
nextFetchOffset = offsetState.getKey();
break;
}
@ -1034,7 +970,7 @@ public class SharePartition {
log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", inFlightBatch,
groupId, topicIdPartition);
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
// Check if member id is the owner of the offset.
if (!offsetState.getValue().memberId().equals(memberId) && !offsetState.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
@ -1042,7 +978,7 @@ public class SharePartition {
+ " partition: {}-{}. Skipping offset.", memberId, offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
return Optional.empty();
}
if (offsetState.getValue().state == RecordState.ACQUIRED) {
if (offsetState.getValue().state() == RecordState.ACQUIRED) {
// These records were fetched but they were not actually delivered to the client.
InFlightState updateResult = offsetState.getValue().startStateTransition(
offsetState.getKey() < startOffset ? RecordState.ARCHIVED : recordState,
@ -1060,11 +996,11 @@ public class SharePartition {
// Successfully updated the state of the offset.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state != RecordState.ARCHIVED) {
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@ -1105,11 +1041,11 @@ public class SharePartition {
// Successfully updated the state of the batch.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state.id, (short) updateResult.deliveryCount));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state != RecordState.ARCHIVED) {
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@ -1293,7 +1229,7 @@ public class SharePartition {
// No further offsets to process.
break;
}
if (offsetState.getValue().state != initialState) {
if (offsetState.getValue().state() != initialState) {
continue;
}
@ -1317,7 +1253,7 @@ public class SharePartition {
// Change the state of complete batch since the same state exists for the entire inFlight batch.
inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
if (initialState == RecordState.ACQUIRED) {
inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
}
return true;
}
@ -1610,12 +1546,16 @@ public class SharePartition {
AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), acquiredRecords.lastOffset());
// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch.
cachedState.put(acquiredRecords.firstOffset(), new InFlightBatch(
timer,
time,
memberId,
acquiredRecords.firstOffset(),
acquiredRecords.lastOffset(),
RecordState.ACQUIRED,
1,
timerTask));
timerTask,
timeoutHandler,
sharePartitionMetrics));
// Update the in-flight batch message count metrics for the share partition.
sharePartitionMetrics.recordInFlightBatchMessageCount(acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
});
@ -1635,7 +1575,7 @@ public class SharePartition {
lock.writeLock().lock();
int acquiredCount = 0;
try {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the request base
// offset i.e. cached batch of 10-14 offsets and request batch of 12-13.
if (offsetState.getKey() < requestFirstOffset) {
@ -1647,7 +1587,7 @@ public class SharePartition {
break;
}
if (offsetState.getValue().state != RecordState.AVAILABLE || offsetState.getValue().hasOngoingStateTransition()) {
if (offsetState.getValue().state() != RecordState.AVAILABLE || offsetState.getValue().hasOngoingStateTransition()) {
log.trace("The offset {} is not available in share partition: {}-{}, skipping: {}",
offsetState.getKey(), groupId, topicIdPartition, inFlightBatch);
continue;
@ -1670,7 +1610,7 @@ public class SharePartition {
result.add(new AcquiredRecords()
.setFirstOffset(offsetState.getKey())
.setLastOffset(offsetState.getKey())
.setDeliveryCount((short) offsetState.getValue().deliveryCount));
.setDeliveryCount((short) offsetState.getValue().deliveryCount()));
acquiredCount++;
}
} finally {
@ -1758,7 +1698,7 @@ public class SharePartition {
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, batch.lastOffset(), true);
// Validate if the request batch has the first offset greater than the last offset of the last
// fetched cached batch, then there will be no offsets in the request that can be acknowledged.
if (subMap.lastEntry().getValue().lastOffset < batch.firstOffset()) {
if (subMap.lastEntry().getValue().lastOffset() < batch.firstOffset()) {
log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition);
throw new InvalidRequestException("Batch record not found. The first offset in request is past acquired records.");
}
@ -1766,7 +1706,7 @@ public class SharePartition {
// Validate if the request batch has the last offset greater than the last offset of
// the last fetched cached batch, then there will be offsets in the request than cannot
// be found in the fetched batches.
if (batch.lastOffset() > subMap.lastEntry().getValue().lastOffset) {
if (batch.lastOffset() > subMap.lastEntry().getValue().lastOffset()) {
log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition);
throw new InvalidRequestException("Batch record not found. The last offset in request is past acquired records.");
}
@ -1888,7 +1828,7 @@ public class SharePartition {
// Fetch the first record state from the map to be used as default record state in case the
// offset record state is not provided by client.
RecordState recordStateDefault = recordStateMap.get(batch.firstOffset());
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
// 1. For the first batch which might have offsets prior to the request base
// offset i.e. cached batch of 10-14 offsets and request batch of 12-13.
@ -1902,7 +1842,7 @@ public class SharePartition {
break;
}
if (offsetState.getValue().state != RecordState.ACQUIRED) {
if (offsetState.getValue().state() != RecordState.ACQUIRED) {
log.debug("The offset is not acquired, offset: {} batch: {} for the share"
+ " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId,
topicIdPartition);
@ -1911,7 +1851,7 @@ public class SharePartition {
}
// Check if member id is the owner of the offset.
if (!offsetState.getValue().memberId.equals(memberId)) {
if (!offsetState.getValue().memberId().equals(memberId)) {
log.debug("Member {} is not the owner of offset: {} in batch: {} for the share"
+ " partition: {}-{}", memberId, offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition);
@ -1940,11 +1880,11 @@ public class SharePartition {
// Successfully updated the state of the offset.
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state != RecordState.ARCHIVED) {
&& updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@ -1994,13 +1934,13 @@ public class SharePartition {
// Successfully updated the state of the batch.
updatedStates.add(updateResult);
stateBatches.add(
new PersisterStateBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset,
updateResult.state.id, (short) updateResult.deliveryCount));
new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the nextFetchOffset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state != RecordState.ARCHIVED) {
&& updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
} finally {
@ -2086,7 +2026,7 @@ public class SharePartition {
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
if (state.state == RecordState.AVAILABLE) {
if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
@ -2253,7 +2193,7 @@ public class SharePartition {
}
lastOffsetAcknowledged = inFlightBatch.lastOffset();
} else {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
if (offsetState.getValue().hasOngoingStateTransition() || !isRecordStateAcknowledged(offsetState.getValue().state())) {
return lastOffsetAcknowledged;
}
@ -2465,11 +2405,11 @@ public class SharePartition {
return;
}
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state.id, (short) updateResult.deliveryCount));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// Cancel the acquisition lock timeout task for the batch since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
return;
@ -2494,7 +2434,7 @@ public class SharePartition {
// No further offsets to process.
break;
}
if (offsetState.getValue().state != RecordState.ACQUIRED) {
if (offsetState.getValue().state() != RecordState.ACQUIRED) {
log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {}"
+ " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition, memberId);
@ -2512,11 +2452,11 @@ public class SharePartition {
continue;
}
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// Cancel the acquisition lock timeout task for the offset since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
}
@ -2838,300 +2778,6 @@ public class SharePartition {
}
}
/**
* The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
*/
final class InFlightBatch {
// The offset of the first record in the batch that is fetched from the log.
private final long firstOffset;
// The last offset of the batch that is fetched from the log.
private final long lastOffset;
// The batch state of the fetched records. If the offset state map is empty then batchState
// determines the state of the complete batch else individual offset determines the state of
// the respective records.
private InFlightState batchState;
// The offset state map is used to track the state of the records per offset. However, the
// offset state map is only required when the state of the offsets within same batch are
// different. The states can be different when explicit offset acknowledgment is done which
// is different from the batch state.
private NavigableMap<Long, InFlightState> offsetState;
InFlightBatch(String memberId, long firstOffset, long lastOffset, RecordState state,
int deliveryCount, AcquisitionLockTimerTask acquisitionLockTimeoutTask
) {
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
this.batchState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
}
// Visible for testing.
long firstOffset() {
return firstOffset;
}
// Visible for testing.
long lastOffset() {
return lastOffset;
}
// Visible for testing.
RecordState batchState() {
return inFlightState().state;
}
// Visible for testing.
String batchMemberId() {
if (batchState == null) {
throw new IllegalStateException("The batch member id is not available as the offset state is maintained");
}
return batchState.memberId;
}
// Visible for testing.
int batchDeliveryCount() {
if (batchState == null) {
throw new IllegalStateException("The batch delivery count is not available as the offset state is maintained");
}
return batchState.deliveryCount;
}
// Visible for testing.
AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
return inFlightState().acquisitionLockTimeoutTask;
}
// Visible for testing.
NavigableMap<Long, InFlightState> offsetState() {
return offsetState;
}
private InFlightState inFlightState() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
return batchState;
}
// Visible for testing.
boolean batchHasOngoingStateTransition() {
return inFlightState().hasOngoingStateTransition();
}
private void archiveBatch(String newMemberId) {
inFlightState().archive(newMemberId);
}
private InFlightState tryUpdateBatchState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
}
return batchState.tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
}
private InFlightState startBatchStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount,
String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
}
return batchState.startStateTransition(newState, ops, maxDeliveryCount, newMemberId);
}
private void maybeInitializeOffsetStateUpdate() {
if (offsetState == null) {
offsetState = new ConcurrentSkipListMap<>();
// The offset state map is not initialized hence initialize the state of the offsets
// from the first offset to the last offset. Mark the batch inflightState to null as
// the state of the records is maintained in the offset state map now.
for (long offset = this.firstOffset; offset <= this.lastOffset; offset++) {
if (batchState.acquisitionLockTimeoutTask != null) {
// The acquisition lock timeout task is already scheduled for the batch, hence we need to schedule
// the acquisition lock timeout task for the offset as well.
long delayMs = batchState.acquisitionLockTimeoutTask.expirationMs() - time.hiResClockMs();
AcquisitionLockTimerTask timerTask = acquisitionLockTimerTask(batchState.memberId, offset, offset, delayMs);
offsetState.put(offset, new InFlightState(batchState.state, batchState.deliveryCount, batchState.memberId, timerTask));
timer.add(timerTask);
} else {
offsetState.put(offset, new InFlightState(batchState.state, batchState.deliveryCount, batchState.memberId));
}
}
// Cancel the acquisition lock timeout task for the batch as the offset state is maintained.
if (batchState.acquisitionLockTimeoutTask != null) {
batchState.cancelAndClearAcquisitionLockTimeoutTask();
}
batchState = null;
}
}
private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
inFlightState().acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}
@Override
public String toString() {
return "InFlightBatch(" +
"firstOffset=" + firstOffset +
", lastOffset=" + lastOffset +
", inFlightState=" + batchState +
", offsetState=" + ((offsetState == null) ? "null" : offsetState) +
")";
}
}
/**
* The InFlightState is used to track the state and delivery count of a record that has been
* fetched from the leader. The state of the record is used to determine if the record should
* be re-deliver or if it can be acknowledged or archived.
*/
static final class InFlightState {
// The state of the fetch batch records.
private RecordState state;
// The number of times the records has been delivered to the client.
private int deliveryCount;
// The member id of the client that is fetching/acknowledging the record.
private String memberId;
// The state of the records before the transition. In case we need to revert an in-flight state, we revert the above
// attributes of InFlightState to this state, namely - state, deliveryCount and memberId.
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
InFlightState(RecordState state, int deliveryCount, String memberId) {
this(state, deliveryCount, memberId, null);
}
InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
this.state = state;
this.deliveryCount = deliveryCount;
this.memberId = memberId;
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}
// Visible for testing.
RecordState state() {
return state;
}
String memberId() {
return memberId;
}
// Visible for testing.
TimerTask acquisitionLockTimeoutTask() {
return acquisitionLockTimeoutTask;
}
void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) throws IllegalArgumentException {
if (this.acquisitionLockTimeoutTask != null) {
throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
}
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}
void cancelAndClearAcquisitionLockTimeoutTask() {
acquisitionLockTimeoutTask.cancel();
acquisitionLockTimeoutTask = null;
}
// Visible for testing.
boolean hasOngoingStateTransition() {
if (rollbackState == null) {
// This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
// been committed.
return false;
}
return rollbackState.state != null;
}
/**
* Try to update the state of the records. The state of the records can only be updated if the
* new state is allowed to be transitioned from old state. The delivery count is not changed
* if the state update is unsuccessful.
*
* @param newState The new state of the records.
* @param ops The behavior on the delivery count.
*
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state
* helps update chaining.
*/
private InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
try {
if (newState == RecordState.AVAILABLE && ops != DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
newState = RecordState.ARCHIVED;
}
state = state.validateTransition(newState);
if (newState != RecordState.ARCHIVED) {
deliveryCount = updatedDeliveryCount(ops);
}
memberId = newMemberId;
return this;
} catch (IllegalStateException e) {
log.error("Failed to update state of the records", e);
rollbackState = null;
return null;
}
}
private int updatedDeliveryCount(DeliveryCountOps ops) {
return switch (ops) {
case INCREASE -> deliveryCount + 1;
case DECREASE -> deliveryCount - 1;
// do nothing
case NO_OP -> deliveryCount;
};
}
private void archive(String newMemberId) {
state = RecordState.ARCHIVED;
memberId = newMemberId;
}
private InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
}
private void completeStateTransition(boolean commit) {
if (commit) {
rollbackState = null;
return;
}
state = rollbackState.state;
deliveryCount = rollbackState.deliveryCount;
memberId = rollbackState.memberId;
rollbackState = null;
}
@Override
public int hashCode() {
return Objects.hash(state, deliveryCount, memberId);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InFlightState that = (InFlightState) o;
return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId);
}
@Override
public String toString() {
return "InFlightState(" +
"state=" + state.toString() +
", deliveryCount=" + deliveryCount +
", memberId=" + memberId +
")";
}
}
/**
* FetchOffsetMetadata class is used to cache offset and its log metadata.
*/

View File

@ -17,8 +17,6 @@
package kafka.server.share;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState;
import kafka.server.share.SharePartition.SharePartitionState;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
@ -57,6 +55,8 @@ import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.NoOpStatePersister;
@ -140,45 +140,6 @@ public class SharePartitionTest {
sharePartitionMetrics.close();
}
@Test
public void testRecordStateValidateTransition() {
// Null check.
assertThrows(NullPointerException.class, () -> RecordState.AVAILABLE.validateTransition(null));
// Same state transition check.
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.AVAILABLE));
assertThrows(IllegalStateException.class, () -> RecordState.ACQUIRED.validateTransition(RecordState.ACQUIRED));
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACKNOWLEDGED));
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
// Invalid state transition to any other state from Acknowledged state.
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.AVAILABLE));
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACQUIRED));
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ARCHIVED));
// Invalid state transition to any other state from Archived state.
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.AVAILABLE));
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ACKNOWLEDGED));
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
// Invalid state transition to any other state from Available state other than Acquired.
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ACKNOWLEDGED));
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ARCHIVED));
// Successful transition from Available to Acquired.
assertEquals(RecordState.ACQUIRED, RecordState.AVAILABLE.validateTransition(RecordState.ACQUIRED));
// Successful transition from Acquired to any state.
assertEquals(RecordState.AVAILABLE, RecordState.ACQUIRED.validateTransition(RecordState.AVAILABLE));
assertEquals(RecordState.ACKNOWLEDGED, RecordState.ACQUIRED.validateTransition(RecordState.ACKNOWLEDGED));
assertEquals(RecordState.ARCHIVED, RecordState.ACQUIRED.validateTransition(RecordState.ARCHIVED));
}
@Test
public void testRecordStateForId() {
assertEquals(RecordState.AVAILABLE, RecordState.forId((byte) 0));
assertEquals(RecordState.ACQUIRED, RecordState.forId((byte) 1));
assertEquals(RecordState.ACKNOWLEDGED, RecordState.forId((byte) 2));
assertEquals(RecordState.ARCHIVED, RecordState.forId((byte) 4));
// Invalid check.
assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5));
}
@Test
public void testMaybeInitialize() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
/**
* The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease,
* or do nothing.
*/
public enum DeliveryCountOps {
INCREASE, DECREASE, NO_OP
}

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.util.timer.Timer;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
*/
public class InFlightBatch {
// The timer is used to schedule the acquisition lock timeout task for the batch.
private final Timer timer;
// The time is used to get the current time in milliseconds.
private final Time time;
// The offset of the first record in the batch that is fetched from the log.
private final long firstOffset;
// The last offset of the batch that is fetched from the log.
private final long lastOffset;
// The acquisition lock timeout handler is used to release the acquired records when the acquisition
// lock timeout is reached.
private final AcquisitionLockTimeoutHandler timeoutHandler;
// The share partition metrics are used to track the metrics related to the share partition.
private final SharePartitionMetrics sharePartitionMetrics;
// The batch state of the fetched records. If the offset state map is empty then batchState
// determines the state of the complete batch else individual offset determines the state of
// the respective records.
private InFlightState batchState;
// The offset state map is used to track the state of the records per offset. However, the
// offset state map is only required when the state of the offsets within same batch are
// different. The states can be different when explicit offset acknowledgment is done which
// is different from the batch state.
private NavigableMap<Long, InFlightState> offsetState;
public InFlightBatch(
Timer timer,
Time time,
String memberId,
long firstOffset,
long lastOffset,
RecordState state,
int deliveryCount,
AcquisitionLockTimerTask acquisitionLockTimeoutTask,
AcquisitionLockTimeoutHandler timeoutHandler,
SharePartitionMetrics sharePartitionMetrics
) {
this.timer = timer;
this.time = time;
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
this.timeoutHandler = timeoutHandler;
this.sharePartitionMetrics = sharePartitionMetrics;
this.batchState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
}
/**
* @return the first offset of the batch.
*/
public long firstOffset() {
return firstOffset;
}
/**
* @return the last offset of the batch.
*/
public long lastOffset() {
return lastOffset;
}
/**
* @return the state of the batch.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public RecordState batchState() {
return inFlightState().state();
}
/**
* @return the member id of the batch.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public String batchMemberId() {
return inFlightState().memberId();
}
/**
* @return the delivery count of the batch.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public int batchDeliveryCount() {
return inFlightState().deliveryCount();
}
/**
* @return the acquisition lock timeout task for the batch.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
return inFlightState().acquisitionLockTimeoutTask();
}
/**
* @return the offset state map which maintains the state of the records per offset.
*/
public NavigableMap<Long, InFlightState> offsetState() {
return offsetState;
}
/**
* Cancel the acquisition lock timeout task and clear the reference to it.
* This method is used to cancel the acquisition lock timeout task if it exists
* and clear the reference to it.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public void cancelAndClearAcquisitionLockTimeoutTask() {
inFlightState().cancelAndClearAcquisitionLockTimeoutTask();
}
/**
* @return true if the batch has an ongoing state transition, false otherwise.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public boolean batchHasOngoingStateTransition() {
return inFlightState().hasOngoingStateTransition();
}
/**
* Archive the batch state. This is used to mark the batch as archived and no further updates
* are allowed to the batch state.
* @param newMemberId The new member id for the records.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public void archiveBatch(String newMemberId) {
inFlightState().archive(newMemberId);
}
/**
* Try to update the batch state. The state of the batch can only be updated if the new state is allowed
* to be transitioned from old state. The delivery count is not changed if the state update is unsuccessful.
*
* @param newState The new state of the records.
* @param ops The behavior on the delivery count.
* @param maxDeliveryCount The maximum delivery count for the records.
* @param newMemberId The new member id for the records.
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state helps update chaining.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public InFlightState tryUpdateBatchState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
}
/**
* Start a state transition for the batch. This is used to mark the batch as in-flight and
* no further updates are allowed to the batch state.
*
* @param newState The new state of the records.
* @param ops The behavior on the delivery count.
* @param maxDeliveryCount The maximum delivery count for the records.
* @param newMemberId The new member id for the records.
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state helps update chaining.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public InFlightState startBatchStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount,
String newMemberId
) {
return inFlightState().startStateTransition(newState, ops, maxDeliveryCount, newMemberId);
}
/**
* Initialize the offset state map if it is not already initialized. This is used to maintain the state of the
* records per offset when the state of the offsets within same batch are different.
*/
public void maybeInitializeOffsetStateUpdate() {
if (offsetState == null) {
offsetState = new ConcurrentSkipListMap<>();
// The offset state map is not initialized hence initialize the state of the offsets
// from the first offset to the last offset. Mark the batch inflightState to null as
// the state of the records is maintained in the offset state map now.
for (long offset = this.firstOffset; offset <= this.lastOffset; offset++) {
if (batchState.acquisitionLockTimeoutTask() != null) {
// The acquisition lock timeout task is already scheduled for the batch, hence we need to schedule
// the acquisition lock timeout task for the offset as well.
long delayMs = batchState.acquisitionLockTimeoutTask().expirationMs() - time.hiResClockMs();
AcquisitionLockTimerTask timerTask = acquisitionLockTimerTask(batchState.memberId(), offset, offset, delayMs);
offsetState.put(offset, new InFlightState(batchState.state(), batchState.deliveryCount(), batchState.memberId(), timerTask));
timer.add(timerTask);
} else {
offsetState.put(offset, new InFlightState(batchState.state(), batchState.deliveryCount(), batchState.memberId()));
}
}
// Cancel the acquisition lock timeout task for the batch as the offset state is maintained.
if (batchState.acquisitionLockTimeoutTask() != null) {
batchState.cancelAndClearAcquisitionLockTimeoutTask();
}
batchState = null;
}
}
/**
* Update the acquisition lock timeout task for the batch. This is used to update the acquisition lock timeout
* task for the batch when the acquisition lock timeout is changed.
*
* @param acquisitionLockTimeoutTask The new acquisition lock timeout task for the batch.
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
*/
public void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
inFlightState().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
}
private InFlightState inFlightState() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
return batchState;
}
private AcquisitionLockTimerTask acquisitionLockTimerTask(
String memberId,
long firstOffset,
long lastOffset,
long delayMs
) {
return new AcquisitionLockTimerTask(time, delayMs, memberId, firstOffset, lastOffset, timeoutHandler, sharePartitionMetrics);
}
@Override
public String toString() {
return "InFlightBatch(" +
"firstOffset=" + firstOffset +
", lastOffset=" + lastOffset +
", inFlightState=" + batchState +
", offsetState=" + ((offsetState == null) ? "null" : offsetState) +
")";
}
}

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
/**
* The InFlightState is used to track the state and delivery count of a record that has been
* fetched from the leader. The state of the record is used to determine if the record should
* be re-deliver or if it can be acknowledged or archived.
*/
public class InFlightState {
private static final Logger log = LoggerFactory.getLogger(InFlightState.class);
// The state of the fetch batch records.
private RecordState state;
// The number of times the records has been delivered to the client.
private int deliveryCount;
// The member id of the client that is fetching/acknowledging the record.
private String memberId;
// The state of the records before the transition. In case we need to revert an in-flight state, we revert the above
// attributes of InFlightState to this state, namely - state, deliveryCount and memberId.
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
// Visible for testing.
public InFlightState(RecordState state, int deliveryCount, String memberId) {
this(state, deliveryCount, memberId, null);
}
InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
this.state = state;
this.deliveryCount = deliveryCount;
this.memberId = memberId;
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}
/**
* @return The current state of the record.
*/
public RecordState state() {
return state;
}
/**
* @return The number of times the record has been delivered.
*/
public int deliveryCount() {
return deliveryCount;
}
/**
* @return The member id of the client that is fetching/acknowledging the record.
*/
public String memberId() {
return memberId;
}
/**
* @return The timer task for the acquisition lock timeout.
*/
public AcquisitionLockTimerTask acquisitionLockTimeoutTask() {
return acquisitionLockTimeoutTask;
}
/**
* Update the acquisition lock timeout task. This method is used to set the acquisition lock
* timeout task for the record. If there is already an acquisition lock timeout task set,
* it throws an IllegalArgumentException.
*
* @param acquisitionLockTimeoutTask The new acquisition lock timeout task to set.
* @throws IllegalArgumentException if there is already an acquisition lock timeout task set.
*/
public void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) throws IllegalArgumentException {
if (this.acquisitionLockTimeoutTask != null) {
throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
}
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}
/**
* Cancel the acquisition lock timeout task and clear the reference to it.
* This method is used to cancel the acquisition lock timeout task if it exists
* and clear the reference to it.
*/
public void cancelAndClearAcquisitionLockTimeoutTask() {
acquisitionLockTimeoutTask.cancel();
acquisitionLockTimeoutTask = null;
}
/**
* Check if there is an ongoing state transition for the records.
* This method checks if the rollbackState is not null, which indicates that
* there has been a state transition that has not been committed yet.
*
* @return true if there is an ongoing state transition, false otherwise.
*/
public boolean hasOngoingStateTransition() {
if (rollbackState == null) {
// This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
// been committed.
return false;
}
return rollbackState.state != null;
}
/**
* Try to update the state of the records. The state of the records can only be updated if the
* new state is allowed to be transitioned from old state. The delivery count is not changed
* if the state update is unsuccessful.
*
* @param newState The new state of the records.
* @param ops The behavior on the delivery count.
* @param maxDeliveryCount The maximum delivery count for the record.
* @param newMemberId The member id of the client that is fetching/acknowledging the record.
*
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state
* helps update chaining.
*/
public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
try {
if (newState == RecordState.AVAILABLE && ops != DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
newState = RecordState.ARCHIVED;
}
state = state.validateTransition(newState);
if (newState != RecordState.ARCHIVED) {
deliveryCount = updatedDeliveryCount(ops);
}
memberId = newMemberId;
return this;
} catch (IllegalStateException e) {
log.error("Failed to update state of the records", e);
rollbackState = null;
return null;
}
}
/**
* Archive the record by setting its state to ARCHIVED, clearing the memberId and
* cancelling the acquisition lock timeout task.
* This method is used to archive the record when it is no longer needed.
*/
public void archive(String newMemberId) {
state = RecordState.ARCHIVED;
memberId = newMemberId;
}
/**
* Start a state transition for the records. This method is used to start a state transition
* for the records. It creates a copy of the current state and sets it as the rollback state.
* If the state transition is successful, it returns the updated state.
*
* @param newState The new state of the records.
* @param ops The behavior on the delivery count.
* @param maxDeliveryCount The maximum delivery count for the record.
* @param newMemberId The member id of the client that is fetching/acknowledging the record.
*
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state
* helps update chaining.
*/
public InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
}
/**
* Complete the state transition for the records. If the commit is true or the state is terminal,
* it cancels the acquisition lock timeout task and clears the rollback state.
* If the commit is false and the state is not terminal, it rolls back the state transition.
*
* @param commit If true, commits the state transition, otherwise rolls back.
*/
public void completeStateTransition(boolean commit) {
if (commit) {
rollbackState = null;
return;
}
state = rollbackState.state;
deliveryCount = rollbackState.deliveryCount;
memberId = rollbackState.memberId;
rollbackState = null;
}
private int updatedDeliveryCount(DeliveryCountOps ops) {
return switch (ops) {
case INCREASE -> deliveryCount + 1;
case DECREASE -> deliveryCount - 1;
// do nothing
case NO_OP -> deliveryCount;
};
}
@Override
public int hashCode() {
return Objects.hash(state, deliveryCount, memberId);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InFlightState that = (InFlightState) o;
return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId);
}
@Override
public String toString() {
return "InFlightState(" +
"state=" + state.toString() +
", deliveryCount=" + deliveryCount +
", memberId=" + memberId +
")";
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
import java.util.Objects;
/**
* The RecordState is used to track the state of a record that has been fetched from the leader.
* The state of the records determines if the records should be re-delivered, move the next fetch
* offset, or be state persisted to disk.
*/
public enum RecordState {
AVAILABLE((byte) 0),
ACQUIRED((byte) 1),
ACKNOWLEDGED((byte) 2),
ARCHIVED((byte) 4);
public final byte id;
RecordState(byte id) {
this.id = id;
}
/**
* Validates that the <code>newState</code> is one of the valid transition from the current
* {@code RecordState}.
*
* @param newState State into which requesting to transition; must be non-<code>null</code>
*
* @return {@code RecordState} <code>newState</code> if validation succeeds. Returning
* <code>newState</code> helps state assignment chaining.
*
* @throws IllegalStateException if the state transition validation fails.
*/
public RecordState validateTransition(RecordState newState) throws IllegalStateException {
Objects.requireNonNull(newState, "newState cannot be null");
if (this == newState) {
throw new IllegalStateException("The state transition is invalid as the new state is"
+ "the same as the current state");
}
if (this == ACKNOWLEDGED || this == ARCHIVED) {
throw new IllegalStateException("The state transition is invalid from the current state: " + this);
}
if (this == AVAILABLE && newState != ACQUIRED) {
throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE");
}
// Either the transition is from Available -> Acquired or from Acquired -> Available/
// Acknowledged/Archived.
return newState;
}
public static RecordState forId(byte id) {
return switch (id) {
case 0 -> AVAILABLE;
case 1 -> ACQUIRED;
case 2 -> ACKNOWLEDGED;
case 4 -> ARCHIVED;
default -> throw new IllegalArgumentException("Unknown record state id: " + id);
};
}
public byte id() {
return this.id;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class RecordStateTest {
@Test
public void testRecordStateValidateTransition() {
// Null check.
assertThrows(NullPointerException.class, () -> RecordState.AVAILABLE.validateTransition(null));
// Same state transition check.
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.AVAILABLE));
assertThrows(IllegalStateException.class, () -> RecordState.ACQUIRED.validateTransition(RecordState.ACQUIRED));
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACKNOWLEDGED));
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
// Invalid state transition to any other state from Acknowledged state.
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.AVAILABLE));
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACQUIRED));
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ARCHIVED));
// Invalid state transition to any other state from Archived state.
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.AVAILABLE));
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ACKNOWLEDGED));
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
// Invalid state transition to any other state from Available state other than Acquired.
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ACKNOWLEDGED));
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ARCHIVED));
// Successful transition from Available to Acquired.
assertEquals(RecordState.ACQUIRED, RecordState.AVAILABLE.validateTransition(RecordState.ACQUIRED));
// Successful transition from Acquired to any state.
assertEquals(RecordState.AVAILABLE, RecordState.ACQUIRED.validateTransition(RecordState.AVAILABLE));
assertEquals(RecordState.ACKNOWLEDGED, RecordState.ACQUIRED.validateTransition(RecordState.ACKNOWLEDGED));
assertEquals(RecordState.ARCHIVED, RecordState.ACQUIRED.validateTransition(RecordState.ARCHIVED));
}
@Test
public void testRecordStateForId() {
assertEquals(RecordState.AVAILABLE, RecordState.forId((byte) 0));
assertEquals(RecordState.ACQUIRED, RecordState.forId((byte) 1));
assertEquals(RecordState.ACKNOWLEDGED, RecordState.forId((byte) 2));
assertEquals(RecordState.ARCHIVED, RecordState.forId((byte) 4));
// Invalid check.
assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5));
}
}