diff --git a/build.gradle b/build.gradle index 5d441bbe254..da2f2481d90 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ configure(allprojects) { project -> imports { mavenBom "com.fasterxml.jackson:jackson-bom:2.9.9" mavenBom "io.netty:netty-bom:4.1.39.Final" - mavenBom "io.projectreactor:reactor-bom:Dysprosium-M3" + mavenBom "io.projectreactor:reactor-bom:Dysprosium-RC1" mavenBom "org.eclipse.jetty:jetty-bom:9.4.20.v20190813" mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.50" mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.0" diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 8f227c54c7d..2d08d457923 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -854,15 +854,22 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> { try { // Need re-wrapping until we get hold of the exception through usingWhen. - return Mono.usingWhen(Mono.just(it), txInfo -> { - try { - return (Mono) invocation.proceedWithInvocation(); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, this::commitTransactionAfterReturning, txInfo -> Mono.empty()) - .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); + return Mono + .usingWhen( + Mono.just(it), + txInfo -> { + try { + return (Mono) invocation.proceedWithInvocation(); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + (txInfo, err) -> Mono.empty(), + this::commitTransactionAfterReturning) + .onErrorResume(ex -> + completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); } catch (Throwable ex) { // target invocation exception @@ -877,15 +884,22 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> { try { // Need re-wrapping until we get hold of the exception through usingWhen. - return Flux.usingWhen(Mono.just(it), txInfo -> { - try { - return this.adapter.toPublisher(invocation.proceedWithInvocation()); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, this::commitTransactionAfterReturning, txInfo -> Mono.empty()) - .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); + return Flux + .usingWhen( + Mono.just(it), + txInfo -> { + try { + return this.adapter.toPublisher(invocation.proceedWithInvocation()); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + (txInfo, ex) -> Mono.empty(), + this::commitTransactionAfterReturning) + .onErrorResume(ex -> + completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); } catch (Throwable ex) { // target invocation exception diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 8929c2e3694..3162525a4ea 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -80,9 +80,15 @@ final class TransactionalOperatorImpl implements TransactionalOperator { // This will normally result in a target object being invoked. // Need re-wrapping of ReactiveTransaction until we get hold of the exception // through usingWhen. - return status.flatMapMany(it -> Flux.usingWhen(Mono.just(it), action::doInTransaction, - this.transactionManager::commit, s -> Mono.empty()) - .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); + return status.flatMapMany(it -> Flux + .usingWhen( + Mono.just(it), + action::doInTransaction, + this.transactionManager::commit, + (tx, ex) -> Mono.empty(), + this.transactionManager::commit) + .onErrorResume(ex -> + rollbackOnException(it, ex).then(Mono.error(ex)))); }) .subscriberContext(TransactionContextManager.getOrCreateContext()) .subscriberContext(TransactionContextManager.getOrCreateContextHolder());