Release R2DBC connection when cleanup fails in transaction
When using R2dbcTransactionManager, connection will not be released if it encounters error while doing `afterCleanup` steps. As `afterCleanup` can use a database connection when doing `setAutoCommit(true)`, it can fail under some conditions where the connection is not reliable. This leads to the Connection not being released. This commit ensures that inner steps of the `doCleanupAfterCompletion` are protected against errors, logging the errors and continuing the cleanup until the last step, which releases the connection. Closes gh-29703 Co-authored-by: Simon Baslé <sbasle@vmware.com>
This commit is contained in:
parent
eff1a1a664
commit
e050c37158
|
@ -357,26 +357,41 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
|
|||
Mono<Void> afterCleanup = Mono.empty();
|
||||
|
||||
if (txObject.isMustRestoreAutoCommit()) {
|
||||
afterCleanup = afterCleanup.then(Mono.from(con.setAutoCommit(true)));
|
||||
Mono<Void> restoreAutoCommitStep = safeCleanupStep(
|
||||
"doCleanupAfterCompletion when restoring autocommit", Mono.from(con.setAutoCommit(true)));
|
||||
afterCleanup = afterCleanup.then(restoreAutoCommitStep);
|
||||
}
|
||||
|
||||
return afterCleanup.then(Mono.defer(() -> {
|
||||
Mono<Void> releaseConnectionStep = Mono.defer(() -> {
|
||||
try {
|
||||
if (txObject.isNewConnectionHolder()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
|
||||
}
|
||||
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
|
||||
return safeCleanupStep("doCleanupAfterCompletion when releasing R2DBC Connection",
|
||||
ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
txObject.getConnectionHolder().clear();
|
||||
}
|
||||
return Mono.empty();
|
||||
}));
|
||||
});
|
||||
return afterCleanup.then(releaseConnectionStep);
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Void> safeCleanupStep(String stepDescription, Mono<Void> stepMono) {
|
||||
if (!logger.isDebugEnabled()) {
|
||||
return stepMono.onErrorComplete();
|
||||
}
|
||||
else {
|
||||
return stepMono.doOnError(e ->
|
||||
logger.debug(String.format("Error ignored during %s: %s", stepDescription, e)))
|
||||
.onErrorComplete();
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<Void> switchAutoCommitIfNecessary(Connection con, Object transaction) {
|
||||
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
|
||||
Mono<Void> prepare = Mono.empty();
|
||||
|
|
|
@ -23,13 +23,16 @@ import io.r2dbc.spi.Connection;
|
|||
import io.r2dbc.spi.ConnectionFactory;
|
||||
import io.r2dbc.spi.IsolationLevel;
|
||||
import io.r2dbc.spi.R2dbcBadGrammarException;
|
||||
import io.r2dbc.spi.R2dbcTimeoutException;
|
||||
import io.r2dbc.spi.Statement;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.r2dbc.BadSqlGrammarException;
|
||||
import org.springframework.transaction.CannotCreateTransactionException;
|
||||
import org.springframework.transaction.IllegalTransactionStateException;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
|
@ -326,6 +329,34 @@ class R2dbcTransactionManagerUnitTests {
|
|||
verifyNoMoreInteractions(connectionMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void testConnectionReleasedWhenRollbackFails() {
|
||||
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty());
|
||||
|
||||
TransactionalOperator operator = TransactionalOperator.create(tm);
|
||||
|
||||
when(connectionMock.isAutoCommit()).thenReturn(true);
|
||||
when(connectionMock.setAutoCommit(true)).thenReturn(Mono.defer(() -> Mono.error(new R2dbcTimeoutException("SET AUTOCOMMIT = 1 timed out"))));
|
||||
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
|
||||
when(connectionMock.setAutoCommit(false)).thenReturn(Mono.empty());
|
||||
|
||||
operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock)
|
||||
.doOnNext(connection -> {
|
||||
throw new IllegalStateException("Intentional error to trigger rollback");
|
||||
}).then()).as(StepVerifier::create)
|
||||
.verifyErrorSatisfies(e -> Assertions.assertThat(e)
|
||||
.isInstanceOf(BadSqlGrammarException.class)
|
||||
.hasCause(new R2dbcBadGrammarException("Rollback should fail"))
|
||||
);
|
||||
|
||||
verify(connectionMock).isAutoCommit();
|
||||
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
|
||||
verify(connectionMock, never()).commitTransaction();
|
||||
verify(connectionMock).rollbackTransaction();
|
||||
verify(connectionMock).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testTransactionSetRollbackOnly() {
|
||||
when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
|
||||
|
|
Loading…
Reference in New Issue