From f52f2b99e5e23ca0fb5f6b8ab9948c686fe9c326 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Mon, 21 Jul 2025 13:12:13 +0100 Subject: [PATCH] KAFKA-19476: Removing AtomicBoolean for findNextFetchOfffset (1/N) (#20207) The PR refactors the findNextFetchOffset variable from AtomicBoolean to boolean itself as the access is always done while holding a lock. This also improves handling of `writeShareGroupState` method response where now complete lock is not required, rather on sub-section. Reviewers: Abhinav Dixit , Andrew Schofield --- .../kafka/server/share/SharePartition.java | 59 +++++++++++-------- .../server/share/SharePartitionTest.java | 4 +- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index b063e090ae0..8a8e62b5d8d 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -79,7 +79,6 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -237,11 +236,6 @@ public class SharePartition { */ private final ReadWriteLock lock; - /** - * The find next fetch offset is used to indicate if the next fetch offset should be recomputed. - */ - private final AtomicBoolean findNextFetchOffset; - /** * The lock to ensure that the same share partition does not enter a fetch queue * while another one is being fetched within the queue. The caller's id that acquires the fetch @@ -275,6 +269,11 @@ public class SharePartition { */ private final int defaultRecordLockDurationMs; + /** + * The find next fetch offset is used to indicate if the next fetch offset should be recomputed. + */ + private boolean findNextFetchOffset; + /** * Timer is used to implement acquisition lock on records that guarantees the movement of records from * acquired to available/archived state upon timeout @@ -410,7 +409,7 @@ public class SharePartition { this.maxDeliveryCount = maxDeliveryCount; this.cachedState = new ConcurrentSkipListMap<>(); this.lock = new ReentrantReadWriteLock(); - this.findNextFetchOffset = new AtomicBoolean(false); + this.findNextFetchOffset = false; this.fetchLock = new AtomicReference<>(null); this.defaultRecordLockDurationMs = defaultRecordLockDurationMs; this.timer = timer; @@ -536,7 +535,7 @@ public class SharePartition { if (!cachedState.isEmpty()) { // If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records // in the cached state are not missed - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); endOffset = cachedState.lastEntry().getValue().lastOffset(); // initialReadGapOffset is not required, if there are no gaps in the read state response if (gapStartOffset != -1) { @@ -599,7 +598,7 @@ public class SharePartition { lock.writeLock().lock(); try { // When none of the records in the cachedState are in the AVAILABLE state, findNextFetchOffset will be false - if (!findNextFetchOffset.get()) { + if (!findNextFetchOffset) { if (cachedState.isEmpty() || startOffset > cachedState.lastEntry().getValue().lastOffset()) { // 1. When cachedState is empty, endOffset is set to the next offset of the last // offset removed from batch, which is the next offset to be fetched. @@ -618,7 +617,7 @@ public class SharePartition { // If cachedState is empty, there is no need of re-computing next fetch offset in future fetch requests. // Same case when startOffset has moved beyond the in-flight records, startOffset and endOffset point to the LSO // and the cached state is fresh. - findNextFetchOffset.set(false); + updateFindNextFetchOffset(false); log.trace("The next fetch offset for the share partition {}-{} is {}", groupId, topicIdPartition, endOffset); return endOffset; } @@ -663,7 +662,7 @@ public class SharePartition { // If nextFetchOffset is -1, then no AVAILABLE records are found in the cachedState, so there is no need of // re-computing next fetch offset in future fetch requests if (nextFetchOffset == -1) { - findNextFetchOffset.set(false); + updateFindNextFetchOffset(false); nextFetchOffset = endOffset + 1; } log.trace("The next fetch offset for the share partition {}-{} is {}", groupId, topicIdPartition, nextFetchOffset); @@ -1064,7 +1063,7 @@ public class SharePartition { // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state. // This should not change the next fetch offset because the record is not available for acquisition if (updateResult.state != RecordState.ARCHIVED) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } } } @@ -1109,7 +1108,7 @@ public class SharePartition { // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state. // This should not change the next fetch offset because the record is not available for acquisition if (updateResult.state != RecordState.ARCHIVED) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } } return Optional.empty(); @@ -1144,7 +1143,7 @@ public class SharePartition { // If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED, // then there is a chance that the next fetch offset can change. if (anyRecordArchived) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } // The new startOffset will be the log start offset. @@ -1206,7 +1205,7 @@ public class SharePartition { // If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED, // then there is a chance that the next fetch offset can change. if (anyRecordArchived) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } } finally { lock.writeLock().unlock(); @@ -1944,7 +1943,7 @@ public class SharePartition { // This should not change the next fetch offset because the record is not available for acquisition if (recordState == RecordState.AVAILABLE && updateResult.state != RecordState.ARCHIVED) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } } } finally { @@ -2000,7 +1999,7 @@ public class SharePartition { // This should not change the nextFetchOffset because the record is not available for acquisition if (recordState == RecordState.AVAILABLE && updateResult.state != RecordState.ARCHIVED) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } } finally { lock.writeLock().unlock(); @@ -2086,11 +2085,11 @@ public class SharePartition { // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. state.cancelAndClearAcquisitionLockTimeoutTask(); if (state.state == RecordState.AVAILABLE) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } }); // Update the cached state and start and end offsets after acknowledging/releasing the acquired records. - cacheStateUpdated = maybeUpdateCachedStateAndOffsets(); + cacheStateUpdated = maybeUpdateCachedStateAndOffsets(); future.complete(null); } finally { lock.writeLock().unlock(); @@ -2467,7 +2466,7 @@ public class SharePartition { // Cancel the acquisition lock timeout task for the batch since it is completed now. updateResult.cancelAndClearAcquisitionLockTimeoutTask(); if (updateResult.state != RecordState.ARCHIVED) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } return; } @@ -2514,7 +2513,7 @@ public class SharePartition { // Cancel the acquisition lock timeout task for the offset since it is completed now. updateResult.cancelAndClearAcquisitionLockTimeoutTask(); if (updateResult.state != RecordState.ARCHIVED) { - findNextFetchOffset.set(true); + updateFindNextFetchOffset(true); } } } @@ -2748,12 +2747,22 @@ public class SharePartition { // Visible for testing. boolean findNextFetchOffset() { - return findNextFetchOffset.get(); + lock.readLock().lock(); + try { + return findNextFetchOffset; + } finally { + lock.readLock().unlock(); + } } - // Visible for testing. Should only be used for testing purposes. - void findNextFetchOffset(boolean findNextOffset) { - findNextFetchOffset.getAndSet(findNextOffset); + // Visible for testing. + void updateFindNextFetchOffset(boolean value) { + lock.writeLock().lock(); + try { + findNextFetchOffset = value; + } finally { + lock.writeLock().unlock(); + } } // Visible for testing diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 53b1ee27919..c52b4e257d9 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1564,7 +1564,7 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetWithFindAndCachedStateEmpty() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - sharePartition.findNextFetchOffset(true); + sharePartition.updateFindNextFetchOffset(true); assertTrue(sharePartition.findNextFetchOffset()); assertEquals(0, sharePartition.nextFetchOffset()); assertFalse(sharePartition.findNextFetchOffset()); @@ -1573,7 +1573,7 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetWithFindAndCachedState() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - sharePartition.findNextFetchOffset(true); + sharePartition.updateFindNextFetchOffset(true); assertTrue(sharePartition.findNextFetchOffset()); fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);