KAFKA-19476: Removing AtomicBoolean for findNextFetchOfffset (1/N) (#20207)
CI / build (push) Waiting to run Details

The PR refactors the findNextFetchOffset variable from AtomicBoolean to
boolean itself as the access is always done while holding a lock. This
also improves handling of `writeShareGroupState` method response where
now complete lock is not required, rather on sub-section.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
 <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-07-21 13:12:13 +01:00 committed by GitHub
parent f188a31124
commit f52f2b99e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 36 additions and 27 deletions

View File

@ -79,7 +79,6 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -237,11 +236,6 @@ public class SharePartition {
*/
private final ReadWriteLock lock;
/**
* The find next fetch offset is used to indicate if the next fetch offset should be recomputed.
*/
private final AtomicBoolean findNextFetchOffset;
/**
* The lock to ensure that the same share partition does not enter a fetch queue
* while another one is being fetched within the queue. The caller's id that acquires the fetch
@ -275,6 +269,11 @@ public class SharePartition {
*/
private final int defaultRecordLockDurationMs;
/**
* The find next fetch offset is used to indicate if the next fetch offset should be recomputed.
*/
private boolean findNextFetchOffset;
/**
* Timer is used to implement acquisition lock on records that guarantees the movement of records from
* acquired to available/archived state upon timeout
@ -410,7 +409,7 @@ public class SharePartition {
this.maxDeliveryCount = maxDeliveryCount;
this.cachedState = new ConcurrentSkipListMap<>();
this.lock = new ReentrantReadWriteLock();
this.findNextFetchOffset = new AtomicBoolean(false);
this.findNextFetchOffset = false;
this.fetchLock = new AtomicReference<>(null);
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
this.timer = timer;
@ -536,7 +535,7 @@ public class SharePartition {
if (!cachedState.isEmpty()) {
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records
// in the cached state are not missed
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
endOffset = cachedState.lastEntry().getValue().lastOffset();
// initialReadGapOffset is not required, if there are no gaps in the read state response
if (gapStartOffset != -1) {
@ -599,7 +598,7 @@ public class SharePartition {
lock.writeLock().lock();
try {
// When none of the records in the cachedState are in the AVAILABLE state, findNextFetchOffset will be false
if (!findNextFetchOffset.get()) {
if (!findNextFetchOffset) {
if (cachedState.isEmpty() || startOffset > cachedState.lastEntry().getValue().lastOffset()) {
// 1. When cachedState is empty, endOffset is set to the next offset of the last
// offset removed from batch, which is the next offset to be fetched.
@ -618,7 +617,7 @@ public class SharePartition {
// If cachedState is empty, there is no need of re-computing next fetch offset in future fetch requests.
// Same case when startOffset has moved beyond the in-flight records, startOffset and endOffset point to the LSO
// and the cached state is fresh.
findNextFetchOffset.set(false);
updateFindNextFetchOffset(false);
log.trace("The next fetch offset for the share partition {}-{} is {}", groupId, topicIdPartition, endOffset);
return endOffset;
}
@ -663,7 +662,7 @@ public class SharePartition {
// If nextFetchOffset is -1, then no AVAILABLE records are found in the cachedState, so there is no need of
// re-computing next fetch offset in future fetch requests
if (nextFetchOffset == -1) {
findNextFetchOffset.set(false);
updateFindNextFetchOffset(false);
nextFetchOffset = endOffset + 1;
}
log.trace("The next fetch offset for the share partition {}-{} is {}", groupId, topicIdPartition, nextFetchOffset);
@ -1064,7 +1063,7 @@ public class SharePartition {
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
}
}
@ -1109,7 +1108,7 @@ public class SharePartition {
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
// This should not change the next fetch offset because the record is not available for acquisition
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
}
return Optional.empty();
@ -1144,7 +1143,7 @@ public class SharePartition {
// If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED,
// then there is a chance that the next fetch offset can change.
if (anyRecordArchived) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
// The new startOffset will be the log start offset.
@ -1206,7 +1205,7 @@ public class SharePartition {
// If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED,
// then there is a chance that the next fetch offset can change.
if (anyRecordArchived) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
} finally {
lock.writeLock().unlock();
@ -1944,7 +1943,7 @@ public class SharePartition {
// This should not change the next fetch offset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
}
} finally {
@ -2000,7 +1999,7 @@ public class SharePartition {
// This should not change the nextFetchOffset because the record is not available for acquisition
if (recordState == RecordState.AVAILABLE
&& updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
} finally {
lock.writeLock().unlock();
@ -2086,11 +2085,11 @@ public class SharePartition {
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
if (state.state == RecordState.AVAILABLE) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
future.complete(null);
} finally {
lock.writeLock().unlock();
@ -2467,7 +2466,7 @@ public class SharePartition {
// Cancel the acquisition lock timeout task for the batch since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
return;
}
@ -2514,7 +2513,7 @@ public class SharePartition {
// Cancel the acquisition lock timeout task for the offset since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
updateFindNextFetchOffset(true);
}
}
}
@ -2748,12 +2747,22 @@ public class SharePartition {
// Visible for testing.
boolean findNextFetchOffset() {
return findNextFetchOffset.get();
lock.readLock().lock();
try {
return findNextFetchOffset;
} finally {
lock.readLock().unlock();
}
}
// Visible for testing. Should only be used for testing purposes.
void findNextFetchOffset(boolean findNextOffset) {
findNextFetchOffset.getAndSet(findNextOffset);
// Visible for testing.
void updateFindNextFetchOffset(boolean value) {
lock.writeLock().lock();
try {
findNextFetchOffset = value;
} finally {
lock.writeLock().unlock();
}
}
// Visible for testing

View File

@ -1564,7 +1564,7 @@ public class SharePartitionTest {
@Test
public void testNextFetchOffsetWithFindAndCachedStateEmpty() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
sharePartition.findNextFetchOffset(true);
sharePartition.updateFindNextFetchOffset(true);
assertTrue(sharePartition.findNextFetchOffset());
assertEquals(0, sharePartition.nextFetchOffset());
assertFalse(sharePartition.findNextFetchOffset());
@ -1573,7 +1573,7 @@ public class SharePartitionTest {
@Test
public void testNextFetchOffsetWithFindAndCachedState() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
sharePartition.findNextFetchOffset(true);
sharePartition.updateFindNextFetchOffset(true);
assertTrue(sharePartition.findNextFetchOffset());
fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);