Defer triggerAfterCompletion invocation in doRollbackOnCommitException

Closes gh-34595
This commit is contained in:
Juergen Hoeller 2025-03-19 10:59:46 +01:00
parent 0141725638
commit cc986cd2e8
2 changed files with 35 additions and 12 deletions

View File

@ -23,6 +23,7 @@ import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import io.r2dbc.spi.Statement;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -32,6 +33,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.dao.TransientDataAccessResourceException;
import org.springframework.r2dbc.BadSqlGrammarException;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.IllegalTransactionStateException;
@ -315,6 +317,31 @@ class R2dbcTransactionManagerTests {
verify(connectionMock).close();
}
@Test
void testCommitAndRollbackFails() {
when(connectionMock.isAutoCommit()).thenReturn(false);
when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() ->
Mono.error(new R2dbcBadGrammarException("Commit should fail"))));
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() ->
Mono.error(new R2dbcTransientResourceException("Rollback should also fail"))));
TransactionalOperator operator = TransactionalOperator.create(tm);
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> connection.createStatement("foo")).then()
.as(operator::transactional)
.as(StepVerifier::create)
.verifyError(TransientDataAccessResourceException.class);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).createStatement("foo");
verify(connectionMock).commitTransaction();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
verifyNoMoreInteractions(connectionMock);
}
@Test
void testTransactionSetRollbackOnly() {
when(connectionMock.isAutoCommit()).thenReturn(false);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -494,21 +494,17 @@ public abstract class AbstractReactiveTransactionManager
}));
}
else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
Mono<Void> mono;
Mono<Void> mono = Mono.empty();
if (!beforeCompletionInvoked.get()) {
mono = triggerBeforeCompletion(synchronizationManager, status);
}
else {
mono = Mono.empty();
}
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex))
.then(propagateException);
}
return result;
})
.then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
@ -518,8 +514,8 @@ public abstract class AbstractReactiveTransactionManager
}))));
return commit
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status)
.then(Mono.error(ex))).then(cleanupAfterCompletion(synchronizationManager, status));
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
.then(cleanupAfterCompletion(synchronizationManager, status));
}
/**
@ -571,8 +567,8 @@ public abstract class AbstractReactiveTransactionManager
}
return beforeCompletion;
}
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex ->
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex));
@ -623,7 +619,7 @@ public abstract class AbstractReactiveTransactionManager
return Mono.empty();
}))
.then(Mono.error(rbex));
}).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))
}).then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
.then(Mono.defer(() -> {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
return Mono.empty();