This commit is contained in:
JimmyWang6 2025-09-25 17:48:17 +08:00 committed by jimmy
parent 07d3d0275f
commit 20dd01ab56
1 changed files with 8 additions and 9 deletions

View File

@ -675,8 +675,8 @@ public class SharePartition {
FetchIsolation isolationLevel
) {
log.trace("Received acquire request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId);
// Since the client side changes are not yet in place, we use BATCH_OPTIMIZED mode as the default AcquireMode.
ShareAcquireMode acquireMode = acquireMode(ShareAcquireMode.STRICT.toString());
// Since the client side changes are not yet in place, use BATCH_OPTIMIZED as the default AcquireMode.
ShareAcquireMode acquireMode = acquireMode(ShareAcquireMode.BATCH_OPTIMIZED.toString());
boolean strict = ShareAcquireMode.STRICT.equals(acquireMode);
if (stateNotActive() || maxFetchRecords <= 0) {
// Nothing to acquire.
@ -798,7 +798,7 @@ public class SharePartition {
// Hence, after the loop iteration the next gap can be considered.
maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
// If the acquired count is equal to the max fetch records then break the loop.
// In strict mode, we should only acquire one batch so also break here.
// In strict mode, only one batch should be acquired so also break the loop.
if (strict || acquiredCount >= maxRecordsToAcquire) {
break;
}
@ -1586,13 +1586,12 @@ public class SharePartition {
// Check how many records can be acquired from the batch.
long lastAcquiredOffset = lastOffset;
long maxOffset = firstAcquiredOffset + maxFetchRecords - 1;
if (maxFetchRecords < lastAcquiredOffset - firstAcquiredOffset + 1) {
// The max records to acquire is less than the complete available batches, limit the acquired records.
// In BATCH_OPTIMIZED mode, the last offset shall be the batches last offset that falls under
// the max records limit, which means the last offset can be higher than the max records.
// In STRICT mode, strictly limit the acquired records to maxFetchRecords.
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, maxOffset);
// The max records to acquire is less than the complete available batches hence
// limit the acquired records. The last offset shall be the batches last offset
// which falls under the max records limit. As the max fetch records is the soft
// limit, the last offset can be higher than the max records.
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
// If the initial read gap offset window is active then it's not guaranteed that the
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
// last offset is greater than the last offset for acquisition, else there could be