diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index fa80c230a58..e3b092c2b76 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -328,6 +328,12 @@ public class SharePartition { */ private final OffsetMetadata fetchOffsetMetadata; + /** + * The delayed share fetch key is used to track the delayed share fetch requests that are waiting + * for the respective share partition. + */ + private final DelayedShareFetchKey delayedShareFetchKey; + /** * The state epoch is used to track the version of the state of the share partition. */ @@ -414,6 +420,7 @@ public class SharePartition { this.replicaManager = replicaManager; this.groupConfigManager = groupConfigManager; this.fetchOffsetMetadata = new OffsetMetadata(); + this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition); this.listener = listener; this.sharePartitionMetrics = sharePartitionMetrics; this.registerGaugeMetrics(); @@ -551,6 +558,9 @@ public class SharePartition { } // Release the lock. lock.writeLock().unlock(); + // Avoid triggering the listener for waiting share fetch requests in purgatory as the + // share partition manager keeps track of same and will trigger the listener for the + // respective share partition. // Complete the future. if (isFailed) { future.completeExceptionally(throwable); @@ -2048,6 +2058,10 @@ public class SharePartition { } writeShareGroupState(stateBatches).whenComplete((result, exception) -> { + // There can be a pending delayed share fetch requests for the share partition which are waiting + // on the startOffset to move ahead, hence track if the state is updated in the cache. If + // yes, then notify the delayed share fetch purgatory to complete the pending requests. + boolean cacheStateUpdated = false; lock.writeLock().lock(); try { if (exception != null) { @@ -2066,27 +2080,31 @@ public class SharePartition { state.cancelAndClearAcquisitionLockTimeoutTask(); }); // Update the cached state and start and end offsets after acknowledging/releasing the acquired records. - maybeUpdateCachedStateAndOffsets(); + cacheStateUpdated = maybeUpdateCachedStateAndOffsets(); future.complete(null); } finally { lock.writeLock().unlock(); + // Maybe complete the delayed share fetch request if the state has been changed in cache + // which might have moved start offset ahead. Hence, the pending delayed share fetch + // request can be completed. The call should be made outside the lock to avoid deadlock. + maybeCompleteDelayedShareFetchRequest(cacheStateUpdated); } }); } - private void maybeUpdateCachedStateAndOffsets() { + private boolean maybeUpdateCachedStateAndOffsets() { lock.writeLock().lock(); try { if (!canMoveStartOffset()) { - return; + return false; } // This will help to find the next position for the startOffset. // The new position of startOffset will be lastOffsetAcknowledged + 1 long lastOffsetAcknowledged = findLastOffsetAcknowledged(); - // If lastOffsetAcknowledged is -1, this means we cannot move out startOffset ahead + // If lastOffsetAcknowledged is -1, this means we cannot move startOffset ahead if (lastOffsetAcknowledged == -1) { - return; + return false; } // This is true if all records in the cachedState have been acknowledged (either Accept or Reject). @@ -2097,7 +2115,7 @@ public class SharePartition { endOffset = lastCachedOffset + 1; cachedState.clear(); // Nothing further to do. - return; + return true; } /* @@ -2144,6 +2162,7 @@ public class SharePartition { if (lastKeyToRemove != -1) { cachedState.subMap(firstKeyToRemove, true, lastKeyToRemove, true).clear(); } + return true; } finally { lock.writeLock().unlock(); } @@ -2405,13 +2424,10 @@ public class SharePartition { lock.writeLock().unlock(); } + // If we have an acquisition lock timeout for a share-partition, then we should check if + // there is a pending share fetch request for the share-partition and complete it. // Skip null check for stateBatches, it should always be initialized if reached here. - if (!stateBatches.isEmpty()) { - // If we have an acquisition lock timeout for a share-partition, then we should check if - // there is a pending share fetch request for the share-partition and complete it. - DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); - replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey); - } + maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty()); } private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch, @@ -2486,6 +2502,12 @@ public class SharePartition { } } + private void maybeCompleteDelayedShareFetchRequest(boolean shouldComplete) { + if (shouldComplete) { + replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey); + } + } + private long startOffsetDuringInitialization(long partitionDataStartOffset) { // Set the state epoch and end offset from the persisted state. if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) { diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index da35cb0f428..2f8438e7038 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; +import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.metrics.SharePartitionMetrics; import org.apache.kafka.server.share.persister.NoOpStatePersister; @@ -1670,7 +1671,11 @@ public class SharePartitionTest { @Test public void testAcknowledgeSingleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withReplicaManager(replicaManager) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records1 = memoryRecords(1, 0); MemoryRecords records2 = memoryRecords(1, 1); @@ -1693,11 +1698,18 @@ public class SharePartitionTest { assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(1L).batchState()); assertEquals(1, sharePartition.cachedState().get(1L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(1L).offsetState()); + // Should not invoke completeDelayedShareFetchRequest as the first offset is not acknowledged yet. + Mockito.verify(replicaManager, Mockito.times(0)) + .completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(GROUP_ID, TOPIC_ID_PARTITION)); } @Test public void testAcknowledgeMultipleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withReplicaManager(replicaManager) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records = memoryRecords(10, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 10); @@ -1711,6 +1723,9 @@ public class SharePartitionTest { assertEquals(15, sharePartition.nextFetchOffset()); assertEquals(0, sharePartition.cachedState().size()); + // Should invoke completeDelayedShareFetchRequest as the start offset is moved. + Mockito.verify(replicaManager, Mockito.times(1)) + .completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(GROUP_ID, TOPIC_ID_PARTITION)); } @Test diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java index 0fe1a4774f5..88dbabf6e5c 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.share.fetch; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import java.util.Objects; @@ -28,6 +29,10 @@ public class DelayedShareFetchGroupKey implements DelayedShareFetchKey { private final Uuid topicId; private final int partition; + public DelayedShareFetchGroupKey(String groupId, TopicIdPartition topicIdPartition) { + this(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); + } + public DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) { this.groupId = groupId; this.topicId = topicId;