Upgrade Reactor to Dysprosium RC1

Closes: gh-23579
This commit is contained in:
Rossen Stoyanchev 2019-09-04 08:54:00 +01:00
parent f26866e4d4
commit 57d9b92b94
3 changed files with 42 additions and 22 deletions

View File

@ -32,7 +32,7 @@ configure(allprojects) { project ->
imports {
mavenBom "com.fasterxml.jackson:jackson-bom:2.9.9"
mavenBom "io.netty:netty-bom:4.1.39.Final"
mavenBom "io.projectreactor:reactor-bom:Dysprosium-M3"
mavenBom "io.projectreactor:reactor-bom:Dysprosium-RC1"
mavenBom "org.eclipse.jetty:jetty-bom:9.4.20.v20190813"
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.50"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.0"

View File

@ -854,15 +854,22 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Mono.<Object, ReactiveTransactionInfo>usingWhen(Mono.just(it), txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty())
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
return Mono
.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::commitTransactionAfterReturning)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
@ -877,15 +884,22 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Flux.usingWhen(Mono.just(it), txInfo -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty())
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
return Flux
.usingWhen(
Mono.just(it),
txInfo -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, ex) -> Mono.empty(),
this::commitTransactionAfterReturning)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception

View File

@ -80,9 +80,15 @@ final class TransactionalOperatorImpl implements TransactionalOperator {
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMapMany(it -> Flux.usingWhen(Mono.just(it), action::doInTransaction,
this.transactionManager::commit, s -> Mono.empty())
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
return status.flatMapMany(it -> Flux
.usingWhen(
Mono.just(it),
action::doInTransaction,
this.transactionManager::commit,
(tx, ex) -> Mono.empty(),
this.transactionManager::commit)
.onErrorResume(ex ->
rollbackOnException(it, ex).then(Mono.error(ex))));
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());