Consistently release savepoint after nested transaction

Closes gh-31133
This commit is contained in:
Juergen Hoeller 2023-09-11 17:36:00 +02:00
parent 11dc11e989
commit 2880e6fba5
2 changed files with 145 additions and 68 deletions

View File

@ -337,19 +337,20 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return Mono.defer(() -> { return Mono.defer(() -> {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
if (txObject.hasSavepoint()) {
// Just release the savepoint, keeping the transactional connection.
return txObject.releaseSavepoint();
}
// Remove the connection holder from the context, if exposed. // Remove the connection holder from the context, if exposed.
if (txObject.isNewConnectionHolder()) { if (txObject.isNewConnectionHolder()) {
synchronizationManager.unbindResource(obtainConnectionFactory()); synchronizationManager.unbindResource(obtainConnectionFactory());
} }
// Reset connection. // Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
Mono<Void> afterCleanup = Mono.empty();
Mono<Void> releaseConnectionStep = Mono.defer(() -> {
try { try {
if (txObject.isNewConnectionHolder()) { if (txObject.isNewConnectionHolder()) {
Connection con = txObject.getConnectionHolder().getConnection();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Releasing R2DBC Connection [" + con + "] after transaction"); logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
} }
@ -364,10 +365,9 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
finally { finally {
txObject.getConnectionHolder().clear(); txObject.getConnectionHolder().clear();
} }
return Mono.empty(); return Mono.empty();
}); });
return afterCleanup.then(releaseConnectionStep);
});
} }
/** /**
@ -511,23 +511,36 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return (this.connectionHolder != null && this.connectionHolder.isTransactionActive()); return (this.connectionHolder != null && this.connectionHolder.isTransactionActive());
} }
public boolean hasSavepoint() {
return (this.savepointName != null);
}
public Mono<Void> createSavepoint() { public Mono<Void> createSavepoint() {
ConnectionHolder holder = getConnectionHolder(); ConnectionHolder holder = getConnectionHolder();
this.savepointName = holder.nextSavepoint(); String currentSavepoint = holder.nextSavepoint();
return Mono.from(holder.getConnection().createSavepoint(this.savepointName)); this.savepointName = currentSavepoint;
return Mono.from(holder.getConnection().createSavepoint(currentSavepoint));
}
public Mono<Void> releaseSavepoint() {
String currentSavepoint = this.savepointName;
if (currentSavepoint == null) {
return Mono.empty();
}
this.savepointName = null;
return Mono.from(getConnectionHolder().getConnection().releaseSavepoint(currentSavepoint));
} }
public Mono<Void> commit() { public Mono<Void> commit() {
Connection connection = getConnectionHolder().getConnection(); return (hasSavepoint() ? Mono.empty() :
return (this.savepointName != null ? Mono.from(getConnectionHolder().getConnection().commitTransaction()));
Mono.from(connection.releaseSavepoint(this.savepointName)) :
Mono.from(connection.commitTransaction()));
} }
public Mono<Void> rollback() { public Mono<Void> rollback() {
Connection connection = getConnectionHolder().getConnection(); Connection connection = getConnectionHolder().getConnection();
return (this.savepointName != null ? String currentSavepoint = this.savepointName;
Mono.from(connection.rollbackTransactionToSavepoint(this.savepointName)) : return (currentSavepoint != null ?
Mono.from(connection.rollbackTransactionToSavepoint(currentSavepoint)) :
Mono.from(connection.rollbackTransaction())); Mono.from(connection.rollbackTransaction()));
} }

View File

@ -27,6 +27,8 @@ import io.r2dbc.spi.Statement;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -44,6 +46,7 @@ import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.inOrder;
import static org.mockito.BDDMockito.mock; import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.never; import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.reset; import static org.mockito.BDDMockito.reset;
@ -365,53 +368,110 @@ class R2dbcTransactionManagerUnitTests {
@Test @Test
void testPropagationNestedWithExistingTransaction() { void testPropagationNestedWithExistingTransaction() {
when(connectionMock.createSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); when(connectionMock.createSavepoint(anyString())).thenReturn(Mono.empty());
when(connectionMock.releaseSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); when(connectionMock.rollbackTransactionToSavepoint(anyString())).thenReturn(Mono.empty());
when(connectionMock.releaseSavepoint(anyString())).thenReturn(Mono.empty());
when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionalOperator operator = TransactionalOperator.create(tm, definition); TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> { operator.execute(tx -> {
assertThat(tx1.isNewTransaction()).isTrue(); assertThat(tx.isNewTransaction()).isTrue();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
return operator.execute(tx2 -> { return Flux.concat(
assertThat(tx2.isNewTransaction()).isTrue(); TransactionalOperator.create(tm, definition).execute(ntx1 -> {
assertThat(ntx1.isNewTransaction()).as("ntx1.isNewTransaction()").isTrue();
assertThat(ntx1.isRollbackOnly()).as("ntx1.isRollbackOnly()").isFalse();
return Mono.empty(); return Mono.empty();
}); }),
TransactionalOperator.create(tm, definition).execute(ntx2 -> {
assertThat(ntx2.isNewTransaction()).as("ntx2.isNewTransaction()").isTrue();
assertThat(ntx2.isRollbackOnly()).as("ntx2.isRollbackOnly()").isFalse();
ntx2.setRollbackOnly();
assertThat(ntx2.isRollbackOnly()).isTrue();
return Mono.empty();
}),
TransactionalOperator.create(tm, definition).execute(ntx3 -> {
assertThat(ntx3.isNewTransaction()).as("ntx3.isNewTransaction()").isTrue();
assertThat(ntx3.isRollbackOnly()).as("ntx3.isRollbackOnly()").isFalse();
return Mono.empty();
}),
TransactionalOperator.create(tm, definition).execute(ntx4 -> {
assertThat(ntx4.isNewTransaction()).as("ntx4.isNewTransaction()").isTrue();
assertThat(ntx4.isRollbackOnly()).as("ntx4.isRollbackOnly()").isFalse();
ntx4.setRollbackOnly();
assertThat(ntx4.isRollbackOnly()).isTrue();
return Flux.concat(
TransactionalOperator.create(tm, definition).execute(ntx4n1 -> {
assertThat(ntx4n1.isNewTransaction()).as("ntx4n1.isNewTransaction()").isTrue();
assertThat(ntx4n1.isRollbackOnly()).as("ntx4n1.isRollbackOnly()").isFalse();
return Mono.empty();
}),
TransactionalOperator.create(tm, definition).execute(ntx4n2 -> {
assertThat(ntx4n2.isNewTransaction()).as("ntx4n2.isNewTransaction()").isTrue();
assertThat(ntx4n2.isRollbackOnly()).as("ntx4n2.isRollbackOnly()").isFalse();
ntx4n2.setRollbackOnly();
assertThat(ntx4n2.isRollbackOnly()).isTrue();
return Mono.empty();
})
);
}),
TransactionalOperator.create(tm, definition).execute(ntx5 -> {
assertThat(ntx5.isNewTransaction()).as("ntx5.isNewTransaction()").isTrue();
assertThat(ntx5.isRollbackOnly()).as("ntx5.isRollbackOnly()").isFalse();
ntx5.setRollbackOnly();
assertThat(ntx5.isRollbackOnly()).isTrue();
return Flux.concat(
TransactionalOperator.create(tm, definition).execute(ntx5n1 -> {
assertThat(ntx5n1.isNewTransaction()).as("ntx5n1.isNewTransaction()").isTrue();
assertThat(ntx5n1.isRollbackOnly()).as("ntx5n1.isRollbackOnly()").isFalse();
return Mono.empty();
}),
TransactionalOperator.create(tm, definition).execute(ntx5n2 -> {
assertThat(ntx5n2.isNewTransaction()).as("ntx5n2.isNewTransaction()").isTrue();
assertThat(ntx5n2.isRollbackOnly()).as("ntx5n2.isRollbackOnly()").isFalse();
ntx5n2.setRollbackOnly();
assertThat(ntx5n2.isRollbackOnly()).isTrue();
return Mono.empty();
})
);
})
);
}).as(StepVerifier::create).verifyComplete(); }).as(StepVerifier::create).verifyComplete();
verify(connectionMock).createSavepoint("SAVEPOINT_1"); InOrder inOrder = inOrder(connectionMock);
verify(connectionMock).releaseSavepoint("SAVEPOINT_1"); // ntx1
verify(connectionMock).commitTransaction(); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_1");
verify(connectionMock).close(); inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_1");
} // ntx2
inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_2");
@Test inOrder.verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_2");
void testPropagationNestedWithExistingTransactionAndRollback() { inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_2");
when(connectionMock.createSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); // ntx3
when(connectionMock.rollbackTransactionToSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_3");
when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_3");
// ntx4
DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_4");
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_5");
inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_5");
TransactionalOperator operator = TransactionalOperator.create(tm, definition); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_6");
operator.execute(tx1 -> { inOrder.verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_6");
assertThat(tx1.isNewTransaction()).isTrue(); inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_6");
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED); inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_4");
return operator.execute(tx2 -> { // ntx5
assertThat(tx2.isNewTransaction()).isTrue(); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_7");
tx2.setRollbackOnly(); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_8");
return Mono.empty(); inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_8");
}); inOrder.verify(connectionMock).createSavepoint("SAVEPOINT_9");
}).as(StepVerifier::create).verifyComplete(); inOrder.verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_9");
inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_9");
verify(connectionMock).createSavepoint("SAVEPOINT_1"); inOrder.verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_7");
verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_1"); inOrder.verify(connectionMock).releaseSavepoint("SAVEPOINT_7");
verify(connectionMock).commitTransaction(); // tx
verify(connectionMock).close(); inOrder.verify(connectionMock).commitTransaction();
inOrder.verify(connectionMock).close();
} }
@Test @Test
@ -452,7 +512,9 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> { return inner.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue(); assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isRollbackOnly()).isFalse();
tx2.setRollbackOnly(); tx2.setRollbackOnly();
assertThat(tx2.isRollbackOnly()).isTrue();
return Mono.empty(); return Mono.empty();
}); });
}).as(StepVerifier::create).verifyComplete(); }).as(StepVerifier::create).verifyComplete();
@ -499,7 +561,9 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> { return inner.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue(); assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isRollbackOnly()).isFalse();
tx2.setRollbackOnly(); tx2.setRollbackOnly();
assertThat(tx2.isRollbackOnly()).isTrue();
return Mono.empty(); return Mono.empty();
}); });
}).as(StepVerifier::create).verifyComplete(); }).as(StepVerifier::create).verifyComplete();