From c1fc59fc23b529e2e6437c4c314fc31a62dd4b96 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 4 Mar 2025 16:04:45 +0000 Subject: [PATCH] KAFKA-18918: Correcting releasing of locks on exception (#19091) The PR corrects the way the locks are released on exception. As `partitionsAcquired` can be a reference to `topicPartitionData`, hence the locks should released prior clearing `partitionsAcquired`. Reviewers: Abhinav Dixit , Andrew Schofield --- .../kafka/server/share/DelayedShareFetch.java | 2 +- .../server/share/DelayedShareFetchTest.java | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 80399f4e05c..18797f3cc0c 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -277,9 +277,9 @@ public class DelayedShareFetch extends DelayedOperation { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); + releasePartitionLocks(topicPartitionData.keySet()); partitionsAcquired.clear(); partitionsAlreadyFetched.clear(); - releasePartitionLocks(topicPartitionData.keySet()); return forceComplete(); } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 8669474a80c..27aae04f176 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -86,6 +86,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -725,6 +726,7 @@ public class DelayedShareFetchTest { Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + Mockito.verify(sp0, times(1)).releaseFetchLock(); // Force complete the request as it's still pending. Return false from the share partition lock acquire. when(sp0.maybeAcquireFetchLock()).thenReturn(false); @@ -747,6 +749,44 @@ public class DelayedShareFetchTest { Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); } + @Test + public void testTryCompleteLocksReleasedOnCompleteException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + LinkedHashMap partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0); + + SharePartition sp0 = mock(SharePartition.class); + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); + + PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0)); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .build()); + assertFalse(delayedShareFetch.isCompleted()); + // Throw exception for onComplete. + doThrow(new RuntimeException()).when(delayedShareFetch).onComplete(); + // Try to complete the request. + assertFalse(delayedShareFetch.tryComplete()); + + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + Mockito.verify(sp0, times(1)).releaseFetchLock(); + } + @Test public void testLocksReleasedForCompletedFetch() { String groupId = "grp";