mirror of https://github.com/apache/kafka.git
KAFKA-18027: MINOR: Correct DelayedOperationPurgatory code around adding of an already completed operation (#17842)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
c6294aacef
commit
aa7a3dbd30
|
@ -147,7 +147,10 @@ public class DelayedOperationPurgatory<T extends DelayedOperation> {
|
||||||
// any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
|
// any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
|
||||||
// holding an exclusive lock to make the call is often unnecessary.
|
// holding an exclusive lock to make the call is often unnecessary.
|
||||||
if (operation.safeTryCompleteOrElse(() -> {
|
if (operation.safeTryCompleteOrElse(() -> {
|
||||||
watchKeys.forEach(key -> watchForOperation(key, operation));
|
watchKeys.forEach(key -> {
|
||||||
|
if (!operation.isCompleted())
|
||||||
|
watchForOperation(key, operation);
|
||||||
|
});
|
||||||
if (!watchKeys.isEmpty())
|
if (!watchKeys.isEmpty())
|
||||||
estimatedTotalOperations.incrementAndGet();
|
estimatedTotalOperations.incrementAndGet();
|
||||||
})) {
|
})) {
|
||||||
|
|
Loading…
Reference in New Issue