diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index b55c990fd4b..68ced23a5f7 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -2424,22 +2424,22 @@ public class SharePartition { releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset); } } - - if (!stateBatches.isEmpty()) { - writeShareGroupState(stateBatches).whenComplete((result, exception) -> { - if (exception != null) { - log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", - groupId, topicIdPartition, memberId, exception); - } - // Even if write share group state RPC call fails, we will still go ahead with the state transition. - // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. - maybeUpdateCachedStateAndOffsets(); - }); - } } finally { lock.writeLock().unlock(); } + if (!stateBatches.isEmpty()) { + writeShareGroupState(stateBatches).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", + groupId, topicIdPartition, memberId, exception); + } + // Even if write share group state RPC call fails, we will still go ahead with the state transition. + // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. + maybeUpdateCachedStateAndOffsets(); + }); + } + // If we have an acquisition lock timeout for a share-partition, then we should check if // there is a pending share fetch request for the share-partition and complete it. // Skip null check for stateBatches, it should always be initialized if reached here.