diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java index 3491aee139e..380f22c9c8e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java @@ -147,7 +147,10 @@ public class DelayedOperationPurgatory { // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, // holding an exclusive lock to make the call is often unnecessary. if (operation.safeTryCompleteOrElse(() -> { - watchKeys.forEach(key -> watchForOperation(key, operation)); + watchKeys.forEach(key -> { + if (!operation.isCompleted()) + watchForOperation(key, operation); + }); if (!watchKeys.isEmpty()) estimatedTotalOperations.incrementAndGet(); })) {