KAFKA-19464: Remove unnecessary update for find next fetch offset (#20315)

The PR removes unnecessary updates for find next fetch offset. When the
state is in transition and not yet completed then anyways respective
offsets should not be considered for acquisition. The find next fetch
offset is updated finally when transition is completed.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Abhinav Dixit
<adixit@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-08-07 13:11:07 +01:00 committed by GitHub
parent 2329def2ff
commit f12a9d8413
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 6 additions and 27 deletions

View File

@ -803,7 +803,7 @@ public class SharePartition {
} }
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount, memberId); InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
if (updateResult == null) { if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
log.info("Unable to acquire records for the batch: {} in share partition: {}-{}", log.info("Unable to acquire records for the batch: {} in share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition); inFlightBatch, groupId, topicIdPartition);
continue; continue;
@ -1009,12 +1009,7 @@ public class SharePartition {
updatedStates.add(updateResult); updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state().id(), (short) updateResult.deliveryCount())); updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the next fetch offset as the offset has not completed the transition yet.
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
} }
} }
return Optional.empty(); return Optional.empty();
@ -1054,12 +1049,7 @@ public class SharePartition {
updatedStates.add(updateResult); updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state().id(), (short) updateResult.deliveryCount())); updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the next fetch offset as the batch has not completed the transition yet.
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
} }
return Optional.empty(); return Optional.empty();
} }
@ -1641,7 +1631,7 @@ public class SharePartition {
InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE,
maxDeliveryCount, memberId); maxDeliveryCount, memberId);
if (updateResult == null) { if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in batch: {}" log.trace("Unable to acquire records for the offset: {} in batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch, + " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition); groupId, topicIdPartition);
@ -1941,12 +1931,7 @@ public class SharePartition {
updatedStates.add(updateResult); updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state().id(), (short) updateResult.deliveryCount())); updateResult.state().id(), (short) updateResult.deliveryCount()));
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state. // Do not update the nextFetchOffset as the offset has not completed the transition yet.
// This should not change the next fetch offset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
} }
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
@ -1996,13 +1981,7 @@ public class SharePartition {
stateBatches.add( stateBatches.add(
new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state().id(), (short) updateResult.deliveryCount())); updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the next fetch offset as the batch has not completed the transition yet.
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the nextFetchOffset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }