Ensure reactive transaction rollback on commit error
This change fixes a situation where error handling was skipped during `processCommit()` in case the `doCommit()` failed. The error handling was set up via an `onErrorResume` operator that was nested inside a `then(...)`, applied to an inner `Mono.empty()`. As a consequence, it would never receive an error signal (effectively decoupling the onErrorResume from the main chain). This change simply moves the error handling back one level up. It also simplifies the `doCommit` code a bit by getting rid of the steps that artificially introduce a `Mono<Object>` return type, which is not really needed. A pre-existing test was missing the fact that the rollback didn't occur, which is now fixed. Another dedicated test is introduced building upon the `ReactiveTestTransactionManager` class. Closes gh-30096
This commit is contained in:
parent
2e5d0470dc
commit
9b50c0d590
|
|
@ -270,6 +270,7 @@ class R2dbcTransactionManagerUnitTests {
|
|||
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
|
||||
verify(connectionMock).createStatement("foo");
|
||||
verify(connectionMock).commitTransaction();
|
||||
verify(connectionMock).rollbackTransaction();
|
||||
verify(connectionMock).close();
|
||||
verifyNoMoreInteractions(connectionMock);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -433,7 +433,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
|
||||
AtomicBoolean beforeCompletionInvoked = new AtomicBoolean();
|
||||
|
||||
Mono<Object> commit = prepareForCommit(synchronizationManager, status)
|
||||
Mono<Void> commit = prepareForCommit(synchronizationManager, status)
|
||||
.then(triggerBeforeCommit(synchronizationManager, status))
|
||||
.then(triggerBeforeCompletion(synchronizationManager, status))
|
||||
.then(Mono.defer(() -> {
|
||||
|
|
@ -445,11 +445,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
return doCommit(synchronizationManager, status);
|
||||
}
|
||||
return Mono.empty();
|
||||
})).then(Mono.empty().onErrorResume(ex -> {
|
||||
Mono<Object> propagateException = Mono.error(ex);
|
||||
})) //
|
||||
.onErrorResume(ex -> {
|
||||
Mono<Void> propagateException = Mono.error(ex);
|
||||
// Store result in a local variable in order to appease the
|
||||
// Eclipse compiler with regard to inferred generics.
|
||||
Mono<Object> result = propagateException;
|
||||
Mono<Void> result = propagateException;
|
||||
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
|
||||
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
|
||||
.then(propagateException);
|
||||
|
|
@ -471,7 +472,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
}
|
||||
|
||||
return result;
|
||||
})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
|
||||
})
|
||||
.then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
|
||||
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
|
||||
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));
|
||||
|
||||
|
|
|
|||
|
|
@ -36,6 +36,8 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
|
||||
private final boolean canCreateTransaction;
|
||||
|
||||
private final boolean forceFailOnCommit;
|
||||
|
||||
protected boolean begin = false;
|
||||
|
||||
protected boolean commit = false;
|
||||
|
|
@ -48,8 +50,13 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
|
||||
|
||||
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) {
|
||||
this(existingTransaction, canCreateTransaction, false);
|
||||
}
|
||||
|
||||
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction, boolean forceFailOnCommit) {
|
||||
this.existingTransaction = existingTransaction;
|
||||
this.canCreateTransaction = canCreateTransaction;
|
||||
this.forceFailOnCommit = forceFailOnCommit;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -79,7 +86,12 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
if (!TRANSACTION.equals(status.getTransaction())) {
|
||||
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
|
||||
}
|
||||
return Mono.fromRunnable(() -> this.commit = true);
|
||||
return Mono.fromRunnable(() -> {
|
||||
this.commit = true;
|
||||
if (this.forceFailOnCommit) {
|
||||
throw new IllegalArgumentException("Forced failure on commit");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -203,6 +203,22 @@ public class ReactiveTransactionSupportTests {
|
|||
assertHasCleanedUp(tm);
|
||||
}
|
||||
|
||||
//gh-28968
|
||||
@Test
|
||||
void errorInCommitDoesInitiateRollbackAfterCommit() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true, true);
|
||||
TransactionalOperator rxtx = TransactionalOperator.create(tm);
|
||||
|
||||
StepVerifier.create(rxtx.transactional(Mono.just("bar")))
|
||||
.verifyErrorMessage("Forced failure on commit");
|
||||
|
||||
assertHasBegan(tm);
|
||||
assertHasCommitted(tm);
|
||||
assertHasRolledBack(tm);
|
||||
assertHasNotSetRollbackOnly(tm);
|
||||
assertHasCleanedUp(tm);
|
||||
}
|
||||
|
||||
private void assertHasBegan(ReactiveTestTransactionManager actual) {
|
||||
assertThat(actual.begin).as("Expected <ReactiveTransactionManager.begin()> but was <begin()> was not invoked").isTrue();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue