diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index ec8e22f11b..db608a86db 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -75,15 +75,13 @@ final class TransactionalOperatorImpl implements TransactionalOperator { public Flux execute(TransactionCallback action) throws TransactionException { return TransactionContextManager.currentContext().flatMapMany(context -> { Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); - return status.flatMapMany(it -> { - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. - return Flux.usingWhen(Mono.just(it), action::doInTransaction, - this.transactionManager::commit, s -> Mono.empty()) - .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))); - }); + // This is an around advice: Invoke the next interceptor in the chain. + // This will normally result in a target object being invoked. + // Need re-wrapping of ReactiveTransaction until we get hold of the exception + // through usingWhen. + return status.flatMapMany(it -> Flux.usingWhen(Mono.just(it), action::doInTransaction, + this.transactionManager::commit, s -> Mono.empty()) + .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); }) .subscriberContext(TransactionContextManager.getOrCreateContext()) .subscriberContext(TransactionContextManager.getOrCreateContextHolder());