Introduce TransactionExecutionListener with begin/commit/rollback notifications

Includes ConfigurableTransactionManager interface for listener registration.
Includes additional introspection methods on TransactionExecution interface.
Includes default method declarations on TransactionStatus/SmartTransactionObject.

Closes gh-27479
This commit is contained in:
Juergen Hoeller 2023-08-06 14:03:44 +02:00
parent eaf54b54c3
commit 3253d2de89
22 changed files with 1372 additions and 481 deletions

View File

@ -131,11 +131,6 @@ public abstract class JdbcTransactionObjectSupport implements SavepointManager,
return this.savepointAllowed;
}
@Override
public void flush() {
// no-op
}
//---------------------------------------------------------------------
// Implementation of SavepointManager

View File

@ -448,11 +448,6 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
public boolean isRollbackOnly() {
return (this.resourceHolder != null && this.resourceHolder.isRollbackOnly());
}
@Override
public void flush() {
// no-op
}
}
}

View File

@ -88,8 +88,7 @@ class R2dbcTransactionManagerUnitTests {
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.flatMap(connection -> TransactionSynchronizationManager.forCurrentTransaction()
.doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization(
sync)))
.doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization(sync)))
.as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
@ -120,12 +119,11 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(
operator::transactional)
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.as(operator::transactional)
.as(StepVerifier::create)
.expectErrorSatisfies(actual -> assertThat(actual).isInstanceOf(
CannotCreateTransactionException.class).hasCauseInstanceOf(
R2dbcBadGrammarException.class))
CannotCreateTransactionException.class).hasCauseInstanceOf(R2dbcBadGrammarException.class))
.verify();
}
@ -140,12 +138,16 @@ class R2dbcTransactionManagerUnitTests {
definition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(
operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
operator.execute(tx -> {
assertThat(tx.getTransactionName()).isEqualTo("my-transaction");
assertThat(tx.hasTransaction()).isTrue();
assertThat(tx.isNewTransaction()).isTrue();
assertThat(tx.isNested()).isFalse();
assertThat(tx.isReadOnly()).isTrue();
assertThat(tx.isRollbackOnly()).isFalse();
assertThat(tx.isCompleted()).isFalse();
return Mono.empty();
}).as(StepVerifier::create).verifyComplete();
ArgumentCaptor<io.r2dbc.spi.TransactionDefinition> txCaptor = ArgumentCaptor.forClass(io.r2dbc.spi.TransactionDefinition.class);
verify(connectionMock).beginTransaction(txCaptor.capture());
@ -171,8 +173,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(
operator::transactional)
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
@ -190,8 +192,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(
operator::transactional)
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
@ -215,8 +217,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(
operator::transactional)
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
@ -262,11 +264,9 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm);
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> {
throw new IllegalStateException();
}).as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalStateException.class);
.doOnNext(connection -> { throw new IllegalStateException(); })
.as(operator::transactional)
.as(StepVerifier::create).verifyError(IllegalStateException.class);
assertThat(commits).hasValue(0);
assertThat(rollbacks).hasValue(1);
@ -286,8 +286,7 @@ class R2dbcTransactionManagerUnitTests {
reactiveTransaction.setRollbackOnly();
return ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> connection.createStatement("foo")).then();
}).as(StepVerifier::create)
.verifyError(BadSqlGrammarException.class);
}).as(StepVerifier::create).verifyError(BadSqlGrammarException.class);
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).createStatement("foo");
@ -308,7 +307,7 @@ class R2dbcTransactionManagerUnitTests {
.doOnNext(connection -> {
throw new IllegalStateException("Intentional error to trigger rollback");
}).then()).as(StepVerifier::create)
.verifyErrorSatisfies(e -> assertThat(e)
.verifyErrorSatisfies(ex -> assertThat(ex)
.isInstanceOf(BadSqlGrammarException.class)
.hasCause(new R2dbcBadGrammarException("Rollback should fail"))
);
@ -327,15 +326,21 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm);
operator.execute(tx -> {
tx.setRollbackOnly();
assertThat(tx.getTransactionName()).isEmpty();
assertThat(tx.hasTransaction()).isTrue();
assertThat(tx.isNewTransaction()).isTrue();
assertThat(tx.isNested()).isFalse();
assertThat(tx.isReadOnly()).isFalse();
assertThat(tx.isRollbackOnly()).isFalse();
tx.setRollbackOnly();
assertThat(tx.isRollbackOnly()).isTrue();
assertThat(tx.isCompleted()).isFalse();
return TransactionSynchronizationManager.forCurrentTransaction().doOnNext(
synchronizationManager -> {
assertThat(synchronizationManager.hasResource(connectionFactoryMock)).isTrue();
synchronizationManager.registerSynchronization(sync);
}).then();
}).as(StepVerifier::create)
.verifyComplete();
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).rollbackTransaction();
@ -357,14 +362,19 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.getTransactionName()).isEmpty();
assertThat(tx1.hasTransaction()).isTrue();
assertThat(tx1.isNewTransaction()).isTrue();
assertThat(tx1.isNested()).isFalse();
assertThat(tx1.isReadOnly()).isFalse();
assertThat(tx1.isRollbackOnly()).isFalse();
assertThat(tx1.isCompleted()).isFalse();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NEVER);
return operator.execute(tx2 -> {
fail("Should have thrown IllegalTransactionStateException");
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
}).as(StepVerifier::create).verifyError(IllegalTransactionStateException.class);
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
@ -381,14 +391,17 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.isNewTransaction()).isTrue();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
return operator.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue();
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyComplete();
assertThat(tx1.hasTransaction()).isTrue();
assertThat(tx1.isNewTransaction()).isTrue();
assertThat(tx1.isNested()).isFalse();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
return operator.execute(tx2 -> {
assertThat(tx2.hasTransaction()).isTrue();
assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isNested()).isTrue();
return Mono.empty();
});
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).createSavepoint("SAVEPOINT_1");
verify(connectionMock).releaseSavepoint("SAVEPOINT_1");
@ -407,15 +420,20 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.isNewTransaction()).isTrue();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
return operator.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue();
tx2.setRollbackOnly();
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyComplete();
assertThat(tx1.hasTransaction()).isTrue();
assertThat(tx1.isNewTransaction()).isTrue();
assertThat(tx1.isNested()).isFalse();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
return operator.execute(tx2 -> {
assertThat(tx2.hasTransaction()).isTrue();
assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isNested()).isTrue();
assertThat(tx2.isRollbackOnly()).isFalse();
tx2.setRollbackOnly();
assertThat(tx2.isRollbackOnly()).isTrue();
return Mono.empty();
});
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).createSavepoint("SAVEPOINT_1");
verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_1");
@ -432,16 +450,19 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.isNewTransaction()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue();
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyComplete();
assertThat(tx1.hasTransaction()).isFalse();
assertThat(tx1.isNewTransaction()).isFalse();
assertThat(tx1.isNested()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.hasTransaction()).isTrue();
assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isNested()).isFalse();
return Mono.empty();
});
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
@ -456,17 +477,22 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.isNewTransaction()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue();
tx2.setRollbackOnly();
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyComplete();
assertThat(tx1.hasTransaction()).isFalse();
assertThat(tx1.isNewTransaction()).isFalse();
assertThat(tx1.isNested()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.hasTransaction()).isTrue();
assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isNested()).isFalse();
assertThat(tx2.isRollbackOnly()).isFalse();
tx2.setRollbackOnly();
assertThat(tx2.isRollbackOnly()).isTrue();
return Mono.empty();
});
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
@ -481,16 +507,19 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.hasTransaction()).isFalse();
assertThat(tx1.isNewTransaction()).isFalse();
assertThat(tx1.isNested()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.hasTransaction()).isTrue();
assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isNested()).isFalse();
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyComplete();
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
@ -505,17 +534,22 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
assertThat(tx1.isNewTransaction()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.isNewTransaction()).isTrue();
tx2.setRollbackOnly();
return Mono.empty();
});
}).as(StepVerifier::create)
.verifyComplete();
assertThat(tx1.hasTransaction()).isFalse();
assertThat(tx1.isNewTransaction()).isFalse();
assertThat(tx1.isNested()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
return inner.execute(tx2 -> {
assertThat(tx2.hasTransaction()).isTrue();
assertThat(tx2.isNewTransaction()).isTrue();
assertThat(tx2.isNested()).isFalse();
assertThat(tx2.isRollbackOnly()).isFalse();
tx2.setRollbackOnly();
assertThat(tx2.isRollbackOnly()).isTrue();
return Mono.empty();
});
}).as(StepVerifier::create).verifyComplete();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();

View File

@ -0,0 +1,54 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction;
import java.util.Collection;
/**
* Common configuration interface for transaction manager implementations.
* Provides registration facilities for {@link TransactionExecutionListener}.
*
* @author Juergen Hoeller
* @since 6.1
* @see PlatformTransactionManager
* @see ReactiveTransactionManager
*/
public interface ConfigurableTransactionManager extends TransactionManager {
/**
* Set the transaction execution listeners for begin/commit/rollback callbacks
* from this transaction manager.
* @see #addListener
*/
void setTransactionExecutionListeners(Collection<TransactionExecutionListener> listeners);
/**
* Return the registered transaction execution listeners for this transaction manager.
* @see #setTransactionExecutionListeners
*/
Collection<TransactionExecutionListener> getTransactionExecutionListeners();
/**
* Conveniently register the given listener for begin/commit/rollback callbacks
* from this transaction manager.
* @see #getTransactionExecutionListeners()
*/
default void addListener(TransactionExecutionListener listener) {
getTransactionExecutionListeners().add(listener);
}
}

View File

@ -42,6 +42,7 @@ import org.springframework.lang.Nullable;
* @see org.springframework.transaction.support.TransactionTemplate
* @see org.springframework.transaction.interceptor.TransactionInterceptor
* @see org.springframework.transaction.ReactiveTransactionManager
* @see ConfigurableTransactionManager
*/
public interface PlatformTransactionManager extends TransactionManager {
@ -68,8 +69,7 @@ public interface PlatformTransactionManager extends TransactionManager {
* @see TransactionDefinition#getTimeout
* @see TransactionDefinition#isReadOnly
*/
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
/**
* Commit the given transaction, with regard to its status. If the transaction

View File

@ -17,7 +17,7 @@
package org.springframework.transaction;
/**
* Representation of an ongoing reactive transaction.
* Representation of an ongoing {@link ReactiveTransactionManager} transaction.
* This is currently a marker interface extending {@link TransactionExecution}
* but may acquire further methods in a future revision.
*

View File

@ -32,6 +32,7 @@ import org.springframework.lang.Nullable;
* @see org.springframework.transaction.reactive.TransactionalOperator
* @see org.springframework.transaction.interceptor.TransactionInterceptor
* @see org.springframework.transaction.PlatformTransactionManager
* @see ConfigurableTransactionManager
*/
public interface ReactiveTransactionManager extends TransactionManager {

View File

@ -19,7 +19,8 @@ package org.springframework.transaction;
/**
* Common representation of the current state of a transaction.
* Serves as base interface for {@link TransactionStatus} as well as
* {@link ReactiveTransaction}.
* {@link ReactiveTransaction}, and as of 6.1 also as transaction
* representation for {@link TransactionExecutionListener}.
*
* @author Juergen Hoeller
* @since 5.2
@ -27,29 +28,102 @@ package org.springframework.transaction;
public interface TransactionExecution {
/**
* Return whether the present transaction is new; otherwise participating
* in an existing transaction, or potentially not running in an actual
* transaction in the first place.
* Return the defined name of the transaction (possibly an empty String).
* <p>In case of Spring's declarative transactions, the exposed name will be
* the {@code fully-qualified class name + "." + method name} (by default).
* <p>The default implementation returns an empty String.
* @since 6.1
* @see TransactionDefinition#getName()
*/
boolean isNewTransaction();
default String getTransactionName() {
return "";
}
/**
* Return whether there is an actual transaction active: this is meant to cover
* a new transaction as well as participation in an existing transaction, only
* returning {@code false} when not running in an actual transaction at all.
* <p>The default implementation returns {@code true}.
* @since 6.1
* @see #isNewTransaction()
* @see #isNested()
* @see #isReadOnly()
*/
default boolean hasTransaction() {
return true;
}
/**
* Return whether the transaction manager considers the present transaction
* as new; otherwise participating in an existing transaction, or potentially
* not running in an actual transaction in the first place.
* <p>This is primarily here for transaction manager state handling.
* Prefer the use of {@link #hasTransaction()} for application purposes
* since this is usually semantically appropriate.
* <p>The "new" status can be transaction manager specific, e.g. returning
* {@code true} for an actual nested transaction but potentially {@code false}
* for a savepoint-based nested transaction scope if the savepoint management
* is explicitly exposed (such as on {@link TransactionStatus}). A combined
* check for any kind of nested execution is provided by {@link #isNested()}.
* <p>The default implementation returns {@code true}.
* @see #hasTransaction()
* @see #isNested()
* @see TransactionStatus#hasSavepoint()
*/
default boolean isNewTransaction() {
return true;
}
/**
* Return if this transaction executes in a nested fashion within another.
* <p>The default implementation returns {@code false}.
* @since 6.1
* @see #hasTransaction()
* @see #isNewTransaction()
* @see TransactionDefinition#PROPAGATION_NESTED
*/
default boolean isNested() {
return false;
}
/**
* Return if this transaction is defined as read-only transaction.
* <p>The default implementation returns {@code false}.
* @since 6.1
* @see TransactionDefinition#isReadOnly()
*/
default boolean isReadOnly() {
return false;
}
/**
* Set the transaction rollback-only. This instructs the transaction manager
* that the only possible outcome of the transaction may be a rollback, as
* alternative to throwing an exception which would in turn trigger a rollback.
* <p>The default implementation throws an UnsupportedOperationException.
* @see #isRollbackOnly()
*/
void setRollbackOnly();
default void setRollbackOnly() {
throw new UnsupportedOperationException("setRollbackOnly not supported");
}
/**
* Return whether the transaction has been marked as rollback-only
* (either by the application or by the transaction infrastructure).
* <p>The default implementation returns {@code false}.
* @see #setRollbackOnly()
*/
boolean isRollbackOnly();
default boolean isRollbackOnly() {
return false;
}
/**
* Return whether this transaction is completed, that is,
* whether it has already been committed or rolled back.
* <p>The default implementation returns {@code false}.
*/
boolean isCompleted();
default boolean isCompleted() {
return false;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction;
import org.springframework.lang.Nullable;
/**
* Callback interface for stateless listening to transaction creation/completion steps
* in a transaction manager. This is primarily meant for observation and statistics;
* consider stateful transaction synchronizations for resource management purposes.
*
* <p>In contrast to synchronizations, the transaction execution listener contract is
* commonly supported for thread-bound transactions as well as reactive transactions.
* The callback-provided {@link TransactionExecution} object will be either a
* {@link TransactionStatus} (for a {@link PlatformTransactionManager} transaction) or
* a {@link ReactiveTransaction} (for a {@link ReactiveTransactionManager} transaction).
*
* @author Juergen Hoeller
* @since 6.1
* @see ConfigurableTransactionManager#addListener
* @see org.springframework.transaction.support.TransactionSynchronizationManager#registerSynchronization
* @see org.springframework.transaction.reactive.TransactionSynchronizationManager#registerSynchronization
*/
public interface TransactionExecutionListener {
/**
* Callback before the transaction begin step.
* @param transaction the current transaction
*/
default void beforeBegin(TransactionExecution transaction) {
}
/**
* Callback after the transaction begin step.
* @param transaction the current transaction
* @param beginFailure an exception occurring during begin
* (or {@code null} after a successful begin step)
*/
default void afterBegin(TransactionExecution transaction, @Nullable Throwable beginFailure) {
}
/**
* Callback before the transaction commit step.
* @param transaction the current transaction
*/
default void beforeCommit(TransactionExecution transaction) {
}
/**
* Callback after the transaction commit step.
* @param transaction the current transaction
* @param commitFailure an exception occurring during commit
* (or {@code null} after a successful commit step)
*/
default void afterCommit(TransactionExecution transaction, @Nullable Throwable commitFailure) {
}
/**
* Callback before the transaction rollback step.
* @param transaction the current transaction
*/
default void beforeRollback(TransactionExecution transaction) {
}
/**
* Callback after the transaction rollback step.
* @param transaction the current transaction
* @param rollbackFailure an exception occurring during rollback
* (or {@code null} after a successful rollback step)
*/
default void afterRollback(TransactionExecution transaction, @Nullable Throwable rollbackFailure) {
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,8 @@ package org.springframework.transaction;
import java.io.Flushable;
/**
* Representation of the status of a transaction.
* Representation of an ongoing {@link PlatformTransactionManager} transaction.
* Extends the common {@link TransactionExecution} interface.
*
* <p>Transactional code can use this to retrieve status information,
* and to programmatically request a rollback (instead of throwing
@ -44,12 +45,15 @@ public interface TransactionStatus extends TransactionExecution, SavepointManage
* <p>This method is mainly here for diagnostic purposes, alongside
* {@link #isNewTransaction()}. For programmatic handling of custom
* savepoints, use the operations provided by {@link SavepointManager}.
* <p>The default implementation returns {@code false}.
* @see #isNewTransaction()
* @see #createSavepoint()
* @see #rollbackToSavepoint(Object)
* @see #releaseSavepoint(Object)
*/
boolean hasSavepoint();
default boolean hasSavepoint() {
return false;
}
/**
* Flush the underlying session to the datastore, if applicable:
@ -58,8 +62,10 @@ public interface TransactionStatus extends TransactionExecution, SavepointManage
* transaction manager does not have a flush concept. A flush signal may
* get applied to the primary resource or to transaction synchronizations,
* depending on the underlying resource.
* <p>The default implementation is empty, considering flush as a no-op.
*/
@Override
void flush();
default void flush() {
}
}

View File

@ -19,6 +19,8 @@ package org.springframework.transaction.reactive;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@ -30,12 +32,14 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.transaction.ConfigurableTransactionManager;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.InvalidTimeoutException;
import org.springframework.transaction.ReactiveTransaction;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionExecutionListener;
import org.springframework.transaction.TransactionSuspensionNotSupportedException;
import org.springframework.transaction.UnexpectedRollbackException;
@ -77,10 +81,24 @@ import org.springframework.transaction.UnexpectedRollbackException;
* @see TransactionSynchronizationManager
*/
@SuppressWarnings("serial")
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
public abstract class AbstractReactiveTransactionManager
implements ReactiveTransactionManager, ConfigurableTransactionManager, Serializable {
protected transient Log logger = LogFactory.getLog(getClass());
private Collection<TransactionExecutionListener> transactionExecutionListeners = new ArrayList<>();
@Override
public final void setTransactionExecutionListeners(Collection<TransactionExecutionListener> listeners) {
this.transactionExecutionListeners = listeners;
}
@Override
public final Collection<TransactionExecutionListener> getTransactionExecutionListeners() {
return this.transactionExecutionListeners;
}
//---------------------------------------------------------------------
// Implementation of ReactiveTransactionManager
@ -99,8 +117,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
return TransactionSynchronizationManager.forCurrentTransaction()
.flatMap(synchronizationManager -> {
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
Object transaction = doGetTransaction(synchronizationManager);
@ -139,13 +156,16 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
return Mono.defer(() -> {
GenericReactiveTransaction status = newReactiveTransaction(
nestedSynchronizationManager, def, transaction, true,
debugEnabled, suspendedResources.orElse(null));
false, debugEnabled, suspendedResources.orElse(null));
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
return doBegin(nestedSynchronizationManager, transaction, def)
.doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, def))
.doOnError(ex -> this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex)))
.thenReturn(status);
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
}).doOnSuccess(status -> this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null)))
.onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
ex -> resume(nestedSynchronizationManager, null, suspendedResources.orElse(null))
.then(Mono.error(ex)));
.then(Mono.error(ex)));
}));
}
else {
@ -190,9 +210,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
Mono<SuspendedResourcesHolder> suspendedResources = suspend(synchronizationManager, transaction);
return suspendedResources.flatMap(suspendedResourcesHolder -> {
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
definition, transaction, true, debugEnabled, suspendedResourcesHolder);
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
definition, transaction, true, false, debugEnabled, suspendedResourcesHolder);
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
return doBegin(synchronizationManager, transaction, definition)
.doOnSuccess(ignore -> prepareSynchronization(synchronizationManager, status, definition))
.doOnError(ex -> this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex)))
.thenReturn(status)
.doOnSuccess(ignore -> this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null)))
.onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx ->
resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx)
.then(Mono.error(beginEx)));
@ -205,7 +229,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
}
// Nested transaction through nested begin and commit/rollback calls.
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
definition, transaction, true, debugEnabled, null);
definition, transaction, true, true, debugEnabled, null);
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status);
}
@ -228,7 +252,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
@Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
definition, transaction, newTransaction, debug, suspendedResources);
definition, transaction, newTransaction, false, debug, suspendedResources);
prepareSynchronization(synchronizationManager, status, definition);
return status;
}
@ -238,11 +262,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
*/
private GenericReactiveTransaction newReactiveTransaction(
TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
@Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
@Nullable Object transaction, boolean newTransaction, boolean nested, boolean debug,
@Nullable Object suspendedResources) {
return new GenericReactiveTransaction(transaction, newTransaction,
!synchronizationManager.isSynchronizationActive(),
definition.isReadOnly(), debug, suspendedResources);
return new GenericReactiveTransaction(definition.getName(), transaction,
newTransaction, !synchronizationManager.isSynchronizationActive(),
nested, definition.isReadOnly(), debug, suspendedResources);
}
/**
@ -438,6 +463,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
return doCommit(synchronizationManager, status);
}
return Mono.empty();
@ -449,11 +475,21 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
Mono<Void> result = propagateException;
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
.then(propagateException);
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
}
return propagateException;
}));
}
else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) {
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
.then(propagateException);
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, ex));
}
return propagateException;
}));
}
else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
Mono<Void> mono;
@ -471,7 +507,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
})
.then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, null));
}
return Mono.empty();
}))));
return commit
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status)
@ -510,6 +552,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
return doRollback(synchronizationManager, status);
}
else {
@ -528,10 +571,22 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
}
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
.then(Mono.error(ex)))
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex));
}
return Mono.empty();
}))
.then(Mono.error(ex)))
.then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
.then(Mono.defer(() -> {
if (status.isNewTransaction()) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
}
return Mono.empty();
}))
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
.then(cleanupAfterCompletion(synchronizationManager, status));
.then(cleanupAfterCompletion(synchronizationManager, status));
}
/**
@ -561,8 +616,16 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> {
logger.error("Commit exception overridden by rollback exception", ex);
return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
.then(Mono.error(rbex));
}).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK));
.then(Mono.defer(() -> {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, rbex));
return Mono.empty();
}))
.then(Mono.error(rbex));
}).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))
.then(Mono.defer(() -> {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
return Mono.empty();
}));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -40,6 +40,9 @@ import org.springframework.util.Assert;
*/
public class GenericReactiveTransaction implements ReactiveTransaction {
@Nullable
private final String transactionName;
@Nullable
private final Object transaction;
@ -47,6 +50,8 @@ public class GenericReactiveTransaction implements ReactiveTransaction {
private final boolean newSynchronization;
private final boolean nested;
private final boolean readOnly;
private final boolean debug;
@ -61,6 +66,7 @@ public class GenericReactiveTransaction implements ReactiveTransaction {
/**
* Create a new {@code DefaultReactiveTransactionStatus} instance.
* @param transactionName the defined name of the transaction
* @param transaction underlying transaction object that can hold state
* for the internal transaction implementation
* @param newTransaction if the transaction is new, otherwise participating
@ -73,19 +79,35 @@ public class GenericReactiveTransaction implements ReactiveTransaction {
* debug logging should be enabled.
* @param suspendedResources a holder for resources that have been suspended
* for this transaction, if any
* @since 6.1
*/
public GenericReactiveTransaction(
@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
@Nullable String transactionName, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean nested, boolean readOnly, boolean debug,
@Nullable Object suspendedResources) {
this.transactionName = transactionName;
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;
this.nested = nested;
this.readOnly = readOnly;
this.debug = debug;
this.suspendedResources = suspendedResources;
}
@Deprecated(since = "6.1", forRemoval = true)
public GenericReactiveTransaction(@Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
this(null, transaction, newTransaction, newSynchronization, false, readOnly, debug, suspendedResources);
}
@Override
public String getTransactionName() {
return (this.transactionName != null ? this.transactionName : "");
}
/**
* Return the underlying transaction object.
@ -96,9 +118,7 @@ public class GenericReactiveTransaction implements ReactiveTransaction {
return this.transaction;
}
/**
* Return whether there is an actual transaction active.
*/
@Override
public boolean hasTransaction() {
return (this.transaction != null);
}
@ -109,16 +129,18 @@ public class GenericReactiveTransaction implements ReactiveTransaction {
}
/**
* Return if a new transaction synchronization has been opened
* for this transaction.
* Return if a new transaction synchronization has been opened for this transaction.
*/
public boolean isNewSynchronization() {
return this.newSynchronization;
}
/**
* Return if this transaction is defined as read-only transaction.
*/
@Override
public boolean isNested() {
return this.nested;
}
@Override
public boolean isReadOnly() {
return this.readOnly;
}
@ -143,6 +165,9 @@ public class GenericReactiveTransaction implements ReactiveTransaction {
@Override
public void setRollbackOnly() {
if (this.completed) {
throw new IllegalStateException("Transaction completed");
}
this.rollbackOnly = true;
}

View File

@ -19,6 +19,8 @@ package org.springframework.transaction.support;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -26,12 +28,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.Nullable;
import org.springframework.transaction.ConfigurableTransactionManager;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.InvalidTimeoutException;
import org.springframework.transaction.NestedTransactionNotSupportedException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionExecutionListener;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSuspensionNotSupportedException;
import org.springframework.transaction.UnexpectedRollbackException;
@ -82,7 +86,8 @@ import org.springframework.util.Assert;
* @see org.springframework.transaction.jta.JtaTransactionManager
*/
@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
public abstract class AbstractPlatformTransactionManager
implements PlatformTransactionManager, ConfigurableTransactionManager, Serializable {
/**
* Always activate transaction synchronization, even for "empty" transactions
@ -136,6 +141,8 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
private boolean rollbackOnCommitFailure = false;
private Collection<TransactionExecutionListener> transactionExecutionListeners = new ArrayList<>();
/**
* Set the transaction synchronization by the name of the corresponding constant
@ -339,6 +346,16 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
return this.rollbackOnCommitFailure;
}
@Override
public final void setTransactionExecutionListeners(Collection<TransactionExecutionListener> listeners) {
this.transactionExecutionListeners = listeners;
}
@Override
public final Collection<TransactionExecutionListener> getTransactionExecutionListeners() {
return this.transactionExecutionListeners;
}
//---------------------------------------------------------------------
// Implementation of PlatformTransactionManager
@ -385,7 +402,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
@ -403,20 +420,6 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
}
/**
* Start a new transaction.
*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
/**
* Create a TransactionStatus for an existing transaction.
*/
@ -446,7 +449,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
return startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
@ -467,16 +470,24 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, false, false, true, debugEnabled, null);
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
status.createAndHoldSavepoint();
}
catch (RuntimeException | Error ex) {
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
return startTransaction(definition, transaction, true, debugEnabled, null);
}
}
@ -506,18 +517,40 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
/**
* Start a new transaction.
*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
doBegin(transaction, definition);
}
catch (RuntimeException | Error ex) {
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
prepareSynchronization(status, definition);
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
return status;
}
/**
* Create a new TransactionStatus for the given arguments,
* also initializing transaction synchronization as appropriate.
* @see #newTransactionStatus
* @see #prepareTransactionStatus
*/
protected final DefaultTransactionStatus prepareTransactionStatus(
private DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
definition, transaction, newTransaction, newSynchronization, false, debug, suspendedResources);
prepareSynchronization(status, definition);
return status;
}
@ -525,21 +558,20 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
/**
* Create a TransactionStatus instance for the given arguments.
*/
protected DefaultTransactionStatus newTransactionStatus(
private DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean newSynchronization, boolean nested, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
return new DefaultTransactionStatus(definition.getName(), transaction, newTransaction,
actualNewSynchronization, nested, definition.isReadOnly(), debug, suspendedResources);
}
/**
* Initialize transaction synchronization as appropriate.
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
private void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
@ -734,6 +766,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
boolean commitListenerInvoked = false;
try {
boolean unexpectedRollback = false;
@ -747,6 +780,8 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
commitListenerInvoked = true;
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
@ -754,6 +789,8 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
commitListenerInvoked = true;
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
@ -768,17 +805,19 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
if (commitListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, ex));
}
}
throw ex;
}
@ -797,6 +836,9 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
if (commitListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, null));
}
}
}
@ -832,6 +874,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
boolean rollbackListenerInvoked = false;
try {
triggerBeforeCompletion(status);
@ -840,12 +883,16 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
rollbackListenerInvoked = true;
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
rollbackListenerInvoked = true;
doRollback(status);
}
else {
@ -874,10 +921,16 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
if (rollbackListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex));
}
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
if (rollbackListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
}
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
@ -915,9 +968,11 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
catch (RuntimeException | Error rbex) {
logger.error("Commit exception overridden by rollback exception", ex);
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, rbex));
throw rbex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -60,6 +60,9 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
@Override
public void setRollbackOnly() {
if (this.completed) {
throw new IllegalStateException("Transaction completed");
}
this.rollbackOnly = true;
}
@ -216,16 +219,4 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
throw new NestedTransactionNotSupportedException("This transaction does not support savepoints");
}
//---------------------------------------------------------------------
// Flushing support
//---------------------------------------------------------------------
/**
* This implementation is empty, considering flush as a no-op.
*/
@Override
public void flush() {
}
}

View File

@ -50,6 +50,9 @@ import org.springframework.util.Assert;
*/
public class DefaultTransactionStatus extends AbstractTransactionStatus {
@Nullable
private final String transactionName;
@Nullable
private final Object transaction;
@ -57,6 +60,8 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
private final boolean newSynchronization;
private final boolean nested;
private final boolean readOnly;
private final boolean debug;
@ -67,6 +72,7 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
/**
* Create a new {@code DefaultTransactionStatus} instance.
* @param transactionName the defined name of the transaction
* @param transaction underlying transaction object that can hold state
* for the internal transaction implementation
* @param newTransaction if the transaction is new, otherwise participating
@ -79,19 +85,35 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
* debug logging should be enabled.
* @param suspendedResources a holder for resources that have been suspended
* for this transaction, if any
* @since 6.1
*/
public DefaultTransactionStatus(
@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
@Nullable String transactionName, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean nested, boolean readOnly, boolean debug,
@Nullable Object suspendedResources) {
this.transactionName = transactionName;
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;
this.nested = nested;
this.readOnly = readOnly;
this.debug = debug;
this.suspendedResources = suspendedResources;
}
@Deprecated(since = "6.1", forRemoval = true)
public DefaultTransactionStatus(@Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
this(null, transaction, newTransaction, newSynchronization, false, readOnly, debug, suspendedResources);
}
@Override
public String getTransactionName() {
return (this.transactionName != null ? this.transactionName : "");
}
/**
* Return the underlying transaction object.
@ -102,9 +124,7 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
return this.transaction;
}
/**
* Return whether there is an actual transaction active.
*/
@Override
public boolean hasTransaction() {
return (this.transaction != null);
}
@ -115,16 +135,18 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
}
/**
* Return if a new transaction synchronization has been opened
* for this transaction.
* Return if a new transaction synchronization has been opened for this transaction.
*/
public boolean isNewSynchronization() {
return this.newSynchronization;
}
/**
* Return if this transaction is defined as read-only transaction.
*/
@Override
public boolean isNested() {
return this.nested;
}
@Override
public boolean isReadOnly() {
return this.readOnly;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,29 +23,34 @@ import java.io.Flushable;
* return an internal rollback-only marker, typically from another
* transaction that has participated and marked it as rollback-only.
*
* <p>Autodetected by DefaultTransactionStatus, to always return a
* current rollbackOnly flag even if not resulting from the current
* <p>Autodetected by {@link DefaultTransactionStatus} in order to always
* return a current rollbackOnly flag even if not resulting from the current
* TransactionStatus.
*
* @author Juergen Hoeller
* @since 1.1
* @see DefaultTransactionStatus#isRollbackOnly
* @see DefaultTransactionStatus#isGlobalRollbackOnly()
*/
public interface SmartTransactionObject extends Flushable {
/**
* Return whether the transaction is internally marked as rollback-only.
* Can, for example, check the JTA UserTransaction.
* <p>The default implementation returns {@code false}.
* @see jakarta.transaction.UserTransaction#getStatus
* @see jakarta.transaction.Status#STATUS_MARKED_ROLLBACK
*/
boolean isRollbackOnly();
default boolean isRollbackOnly() {
return false;
}
/**
* Flush the underlying sessions to the datastore, if applicable:
* for example, all affected Hibernate/JPA sessions.
* <p>The default implementation is empty, considering flush as a no-op.
*/
@Override
void flush();
default void flush() {
}
}

View File

@ -72,10 +72,17 @@ public class JtaTransactionManagerTests {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
// something transactional
assertThat(status.getTransactionName()).isEqualTo("txName");
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isFalse();
assertThat(status.isReadOnly()).isFalse();
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isTrue();
TransactionSynchronizationManager.registerSynchronization(synch);
assertThat(TransactionSynchronizationManager.getCurrentTransactionName()).isEqualTo("txName");
assertThat(TransactionSynchronizationManager.isCurrentTransactionReadOnly()).isFalse();
assertThat(status.isRollbackOnly()).isFalse();
assertThat(status.isCompleted()).isFalse();
}
});
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
@ -105,8 +112,15 @@ public class JtaTransactionManagerTests {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
// something transactional
assertThat(status.getTransactionName()).isEmpty();
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isFalse();
assertThat(status.isReadOnly()).isFalse();
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isTrue();
TransactionSynchronizationManager.registerSynchronization(synch);
assertThat(status.isRollbackOnly()).isFalse();
assertThat(status.isCompleted()).isFalse();
}
});
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
@ -134,7 +148,14 @@ public class JtaTransactionManagerTests {
tt.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
assertThat(status.getTransactionName()).isEmpty();
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isFalse();
assertThat(status.isReadOnly()).isFalse();
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
assertThat(status.isRollbackOnly()).isFalse();
assertThat(status.isCompleted()).isFalse();
}
});
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
@ -160,11 +181,18 @@ public class JtaTransactionManagerTests {
tt.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
assertThat(status.getTransactionName()).isEqualTo("txName");
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isFalse();
assertThat(status.isReadOnly()).isFalse();
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isTrue();
TransactionSynchronizationManager.registerSynchronization(synch);
assertThat(TransactionSynchronizationManager.getCurrentTransactionName()).isEqualTo("txName");
assertThat(TransactionSynchronizationManager.isCurrentTransactionReadOnly()).isFalse();
status.setRollbackOnly();
assertThat(status.isRollbackOnly()).isTrue();
assertThat(status.isCompleted()).isFalse();
}
});
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
@ -192,9 +220,16 @@ public class JtaTransactionManagerTests {
tt.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
assertThat(status.getTransactionName()).isEmpty();
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isFalse();
assertThat(status.isReadOnly()).isFalse();
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isTrue();
TransactionSynchronizationManager.registerSynchronization(synch);
status.setRollbackOnly();
assertThat(status.isRollbackOnly()).isTrue();
assertThat(status.isCompleted()).isFalse();
}
});
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
@ -221,8 +256,16 @@ public class JtaTransactionManagerTests {
tt.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
assertThat(status.getTransactionName()).isEmpty();
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isFalse();
assertThat(status.isReadOnly()).isFalse();
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
assertThat(status.isRollbackOnly()).isFalse();
status.setRollbackOnly();
assertThat(status.isRollbackOnly()).isTrue();
assertThat(status.isCompleted()).isFalse();
}
});
assertThat(TransactionSynchronizationManager.isSynchronizationActive()).isFalse();
@ -808,6 +851,13 @@ public class JtaTransactionManagerTests {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
// something transactional
assertThat(status.getTransactionName()).isEmpty();
assertThat(status.hasTransaction()).isTrue();
assertThat(status.isNewTransaction()).isTrue();
assertThat(status.isNested()).isTrue();
assertThat(status.isReadOnly()).isFalse();
assertThat(status.isRollbackOnly()).isFalse();
assertThat(status.isCompleted()).isFalse();
}
});
@ -859,8 +909,11 @@ public class JtaTransactionManagerTests {
given(ut.getStatus()).willReturn(Status.STATUS_NO_TRANSACTION);
willThrow(new SystemException("system exception")).given(ut).begin();
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
ptm.addListener(tl);
assertThatExceptionOfType(CannotCreateTransactionException.class).isThrownBy(() -> {
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TransactionTemplate tt = new TransactionTemplate(ptm);
tt.execute(new TransactionCallbackWithoutResult() {
@Override
@ -869,6 +922,16 @@ public class JtaTransactionManagerTests {
}
});
});
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isInstanceOf(CannotCreateTransactionException.class);
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.commitFailure).isNull();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
assertThat(tl.rollbackFailure).isNull();
}
@Test
@ -878,8 +941,11 @@ public class JtaTransactionManagerTests {
Status.STATUS_ACTIVE, Status.STATUS_ACTIVE);
willThrow(new RollbackException("unexpected rollback")).given(ut).commit();
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
ptm.addListener(tl);
assertThatExceptionOfType(UnexpectedRollbackException.class).isThrownBy(() -> {
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TransactionTemplate tt = new TransactionTemplate(ptm);
tt.execute(new TransactionCallbackWithoutResult() {
@Override
@ -896,6 +962,16 @@ public class JtaTransactionManagerTests {
});
});
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isNull();
assertThat(tl.beforeCommitCalled).isTrue();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.commitFailure).isNull();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isTrue();
assertThat(tl.rollbackFailure).isNull();
verify(ut).begin();
}
@ -1023,8 +1099,11 @@ public class JtaTransactionManagerTests {
Status.STATUS_ACTIVE, Status.STATUS_ACTIVE);
willThrow(new SystemException("system exception")).given(ut).commit();
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
ptm.addListener(tl);
assertThatExceptionOfType(TransactionSystemException.class).isThrownBy(() -> {
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TransactionTemplate tt = new TransactionTemplate(ptm);
tt.execute(new TransactionCallbackWithoutResult() {
@Override
@ -1041,6 +1120,16 @@ public class JtaTransactionManagerTests {
});
});
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isNull();
assertThat(tl.beforeCommitCalled).isTrue();
assertThat(tl.afterCommitCalled).isTrue();
assertThat(tl.commitFailure).isInstanceOf(TransactionSystemException.class);
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
assertThat(tl.rollbackFailure).isNull();
verify(ut).begin();
}
@ -1050,8 +1139,11 @@ public class JtaTransactionManagerTests {
given(ut.getStatus()).willReturn(Status.STATUS_NO_TRANSACTION, Status.STATUS_ACTIVE);
willThrow(new SystemException("system exception")).given(ut).rollback();
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
ptm.addListener(tl);
assertThatExceptionOfType(TransactionSystemException.class).isThrownBy(() -> {
JtaTransactionManager ptm = newJtaTransactionManager(ut);
TransactionTemplate tt = new TransactionTemplate(ptm);
tt.execute(new TransactionCallbackWithoutResult() {
@Override
@ -1068,6 +1160,16 @@ public class JtaTransactionManagerTests {
});
});
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isNull();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.commitFailure).isNull();
assertThat(tl.beforeRollbackCalled).isTrue();
assertThat(tl.afterRollbackCalled).isTrue();
assertThat(tl.rollbackFailure).isInstanceOf(TransactionSystemException.class);
verify(ut).begin();
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction;
import org.springframework.lang.Nullable;
/**
* @author Juergen Hoeller
* @since 6.1
*/
public class TestTransactionExecutionListener implements TransactionExecutionListener {
public boolean beforeBeginCalled;
public boolean afterBeginCalled;
@Nullable
public Throwable beginFailure;
public boolean beforeCommitCalled;
public boolean afterCommitCalled;
@Nullable
public Throwable commitFailure;
public boolean beforeRollbackCalled;
public boolean afterRollbackCalled;
@Nullable
public Throwable rollbackFailure;
@Override
public void beforeBegin(TransactionExecution transactionState) {
this.beforeBeginCalled = true;
}
@Override
public void afterBegin(TransactionExecution transactionState, @Nullable Throwable beginFailure) {
this.afterBeginCalled = true;
this.beginFailure = beginFailure;
}
@Override
public void beforeCommit(TransactionExecution transactionState) {
this.beforeCommitCalled = true;
}
@Override
public void afterCommit(TransactionExecution transactionState, @Nullable Throwable commitFailure) {
this.afterCommitCalled = true;
this.commitFailure = commitFailure;
}
@Override
public void beforeRollback(TransactionExecution transactionState) {
this.beforeRollbackCalled = true;
}
@Override
public void afterRollback(TransactionExecution transactionState, @Nullable Throwable rollbackFailure) {
this.afterRollbackCalled = true;
this.rollbackFailure = rollbackFailure;
}
}

View File

@ -16,8 +16,11 @@
package org.springframework.transaction.reactive;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -26,6 +29,7 @@ import org.springframework.transaction.TransactionDefinition;
* Test implementation of a {@link ReactiveTransactionManager}.
*
* @author Mark Paluch
* @author Juergen Hoeller
*/
@SuppressWarnings("serial")
class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager {
@ -36,7 +40,11 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
private final boolean canCreateTransaction;
private final boolean forceFailOnCommit;
@Nullable
private Function<String, RuntimeException> forceFailOnCommit;
@Nullable
private Function<String, RuntimeException> forceFailOnRollback;
protected boolean begin = false;
@ -50,13 +58,15 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) {
this(existingTransaction, canCreateTransaction, false);
}
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction, boolean forceFailOnCommit) {
this.existingTransaction = existingTransaction;
this.canCreateTransaction = canCreateTransaction;
}
ReactiveTestTransactionManager(boolean existingTransaction, @Nullable Function<String, RuntimeException> forceFailOnCommit, @Nullable Function<String, RuntimeException> forceFailOnRollback) {
this.existingTransaction = existingTransaction;
this.canCreateTransaction = true;
this.forceFailOnCommit = forceFailOnCommit;
this.forceFailOnRollback = forceFailOnRollback;
}
@ -88,8 +98,8 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
}
return Mono.fromRunnable(() -> {
this.commit = true;
if (this.forceFailOnCommit) {
throw new IllegalArgumentException("Forced failure on commit");
if (this.forceFailOnCommit != null) {
throw this.forceFailOnCommit.apply("Forced failure on commit");
}
});
}
@ -99,7 +109,12 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
if (!TRANSACTION.equals(status.getTransaction())) {
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
}
return Mono.fromRunnable(() -> this.rollback = true);
return Mono.fromRunnable(() -> {
this.rollback = true;
if (this.forceFailOnRollback != null) {
throw this.forceFailOnRollback.apply("Forced failure on rollback");
}
});
}
@Override

View File

@ -21,10 +21,14 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.ReactiveTransaction;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TestTransactionExecutionListener;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import static org.assertj.core.api.Assertions.assertThat;
@ -86,6 +90,9 @@ public class ReactiveTransactionSupportTests {
@Test
public void commitWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete();
@ -95,11 +102,21 @@ public class ReactiveTransactionSupportTests {
assertHasNoRollback(tm);
assertHasNotSetRollbackOnly(tm);
assertHasCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isTrue();
assertThat(tl.afterCommitCalled).isTrue();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
public void rollbackWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -109,11 +126,21 @@ public class ReactiveTransactionSupportTests {
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
assertHasCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isTrue();
assertThat(tl.afterRollbackCalled).isTrue();
}
@Test
public void rollbackOnlyWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly)
.flatMap(tm::commit)
.contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
@ -124,11 +151,21 @@ public class ReactiveTransactionSupportTests {
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
assertHasCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isTrue();
assertThat(tl.afterRollbackCalled).isTrue();
}
@Test
public void commitWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete();
@ -138,11 +175,21 @@ public class ReactiveTransactionSupportTests {
assertHasNoRollback(tm);
assertHasNotSetRollbackOnly(tm);
assertHasNotCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isFalse();
assertThat(tl.afterBeginCalled).isFalse();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
public void rollbackWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -152,11 +199,21 @@ public class ReactiveTransactionSupportTests {
assertHasNoRollback(tm);
assertHasSetRollbackOnly(tm);
assertHasNotCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isFalse();
assertThat(tl.afterBeginCalled).isFalse();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
public void rollbackOnlyWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly).flatMap(tm::commit)
.contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -166,6 +223,13 @@ public class ReactiveTransactionSupportTests {
assertHasNoRollback(tm);
assertHasSetRollbackOnly(tm);
assertHasNotCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isFalse();
assertThat(tl.afterBeginCalled).isFalse();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
@ -203,10 +267,11 @@ public class ReactiveTransactionSupportTests {
assertHasCleanedUp(tm);
}
//gh-28968
@Test
@Test // gh-28968
void errorInCommitDoesInitiateRollbackAfterCommit() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true, true);
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, ConcurrencyFailureException::new, null);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionalOperator rxtx = TransactionalOperator.create(tm);
StepVerifier.create(rxtx.transactional(Mono.just("bar")))
@ -217,8 +282,79 @@ public class ReactiveTransactionSupportTests {
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
assertHasCleanedUp(tm);
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isTrue();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isTrue();
}
@Test
public void beginFailure() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, false);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyError();
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isInstanceOf(CannotCreateTransactionException.class);
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.commitFailure).isNull();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
assertThat(tl.rollbackFailure).isNull();
}
@Test
public void commitFailure() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, TransactionSystemException::new, null);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyError();
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isNull();
assertThat(tl.beforeCommitCalled).isTrue();
assertThat(tl.afterCommitCalled).isTrue();
assertThat(tl.commitFailure).isInstanceOf(TransactionSystemException.class);
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
assertThat(tl.rollbackFailure).isNull();
}
@Test
public void rollbackFailure() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, null, TransactionSystemException::new);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyError();
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beginFailure).isNull();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.commitFailure).isNull();
assertThat(tl.beforeRollbackCalled).isTrue();
assertThat(tl.afterRollbackCalled).isTrue();
assertThat(tl.rollbackFailure).isInstanceOf(TransactionSystemException.class);
}
private void assertHasBegan(ReactiveTestTransactionManager actual) {
assertThat(actual.begin).as("Expected <ReactiveTransactionManager.begin()> but was <begin()> was not invoked").isTrue();
}

View File

@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.MockCallbackPreferringTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TestTransactionExecutionListener;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSystemException;
@ -63,6 +64,7 @@ class TransactionSupportTests {
@Test
void noExistingTransaction() {
PlatformTransactionManager tm = new TestTransactionManager(false, true);
DefaultTransactionStatus status1 = (DefaultTransactionStatus)
tm.getTransaction(new DefaultTransactionDefinition(PROPAGATION_SUPPORTS));
assertThat(status1.hasTransaction()).as("Must not have transaction").isFalse();
@ -72,13 +74,14 @@ class TransactionSupportTests {
assertThat(status2.hasTransaction()).as("Must have transaction").isTrue();
assertThat(status2.isNewTransaction()).as("Must be new transaction").isTrue();
assertThatExceptionOfType(IllegalTransactionStateException.class).isThrownBy(() ->
tm.getTransaction(new DefaultTransactionDefinition(PROPAGATION_MANDATORY)));
assertThatExceptionOfType(IllegalTransactionStateException.class)
.isThrownBy(() -> tm.getTransaction(new DefaultTransactionDefinition(PROPAGATION_MANDATORY)));
}
@Test
void existingTransaction() {
PlatformTransactionManager tm = new TestTransactionManager(true, true);
DefaultTransactionStatus status1 = (DefaultTransactionStatus)
tm.getTransaction(new DefaultTransactionDefinition(PROPAGATION_SUPPORTS));
assertThat(status1.getTransaction()).as("Must have transaction").isNotNull();
@ -98,6 +101,9 @@ class TransactionSupportTests {
@Test
void commitWithoutExistingTransaction() {
TestTransactionManager tm = new TestTransactionManager(false, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionStatus status = tm.getTransaction(null);
tm.commit(status);
@ -105,11 +111,21 @@ class TransactionSupportTests {
assertThat(tm.commit).as("triggered commit").isTrue();
assertThat(tm.rollback).as("no rollback").isFalse();
assertThat(tm.rollbackOnly).as("no rollbackOnly").isFalse();
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isTrue();
assertThat(tl.afterCommitCalled).isTrue();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
void rollbackWithoutExistingTransaction() {
TestTransactionManager tm = new TestTransactionManager(false, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionStatus status = tm.getTransaction(null);
tm.rollback(status);
@ -117,11 +133,21 @@ class TransactionSupportTests {
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("triggered rollback").isTrue();
assertThat(tm.rollbackOnly).as("no rollbackOnly").isFalse();
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isTrue();
assertThat(tl.afterRollbackCalled).isTrue();
}
@Test
void rollbackOnlyWithoutExistingTransaction() {
TestTransactionManager tm = new TestTransactionManager(false, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionStatus status = tm.getTransaction(null);
status.setRollbackOnly();
tm.commit(status);
@ -130,11 +156,21 @@ class TransactionSupportTests {
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("triggered rollback").isTrue();
assertThat(tm.rollbackOnly).as("no rollbackOnly").isFalse();
assertThat(tl.beforeBeginCalled).isTrue();
assertThat(tl.afterBeginCalled).isTrue();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isTrue();
assertThat(tl.afterRollbackCalled).isTrue();
}
@Test
void commitWithExistingTransaction() {
TestTransactionManager tm = new TestTransactionManager(true, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionStatus status = tm.getTransaction(null);
tm.commit(status);
@ -142,11 +178,21 @@ class TransactionSupportTests {
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("no rollback").isFalse();
assertThat(tm.rollbackOnly).as("no rollbackOnly").isFalse();
assertThat(tl.beforeBeginCalled).isFalse();
assertThat(tl.afterBeginCalled).isFalse();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
void rollbackWithExistingTransaction() {
TestTransactionManager tm = new TestTransactionManager(true, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionStatus status = tm.getTransaction(null);
tm.rollback(status);
@ -154,11 +200,21 @@ class TransactionSupportTests {
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("no rollback").isFalse();
assertThat(tm.rollbackOnly).as("triggered rollbackOnly").isTrue();
assertThat(tl.beforeBeginCalled).isFalse();
assertThat(tl.afterBeginCalled).isFalse();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
void rollbackOnlyWithExistingTransaction() {
TestTransactionManager tm = new TestTransactionManager(true, true);
TestTransactionExecutionListener tl = new TestTransactionExecutionListener();
tm.addListener(tl);
TransactionStatus status = tm.getTransaction(null);
status.setRollbackOnly();
tm.commit(status);
@ -167,6 +223,13 @@ class TransactionSupportTests {
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("no rollback").isFalse();
assertThat(tm.rollbackOnly).as("triggered rollbackOnly").isTrue();
assertThat(tl.beforeBeginCalled).isFalse();
assertThat(tl.afterBeginCalled).isFalse();
assertThat(tl.beforeCommitCalled).isFalse();
assertThat(tl.afterCommitCalled).isFalse();
assertThat(tl.beforeRollbackCalled).isFalse();
assertThat(tl.afterRollbackCalled).isFalse();
}
@Test
@ -204,14 +267,14 @@ class TransactionSupportTests {
TestTransactionManager tm = new TestTransactionManager(false, true);
TransactionTemplate template = new TransactionTemplate(tm);
RuntimeException ex = new RuntimeException("Some application exception");
assertThatRuntimeException().isThrownBy(() ->
template.execute(new TransactionCallbackWithoutResult() {
assertThatRuntimeException()
.isThrownBy(() -> template.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
throw ex;
}
}))
.isSameAs(ex);
.isSameAs(ex);
assertThat(tm.begin).as("triggered begin").isTrue();
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("triggered rollback").isTrue();
@ -232,8 +295,8 @@ class TransactionSupportTests {
TransactionTemplate template = new TransactionTemplate(tm);
RuntimeException ex = new RuntimeException("Some application exception");
assertThatRuntimeException()
.isThrownBy(() -> template.executeWithoutResult(status -> { throw ex; }))
.isSameAs(tex);
.isThrownBy(() -> template.executeWithoutResult(status -> { throw ex; }))
.isSameAs(tex);
assertThat(tm.begin).as("triggered begin").isTrue();
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("triggered rollback").isTrue();
@ -245,7 +308,7 @@ class TransactionSupportTests {
TestTransactionManager tm = new TestTransactionManager(false, true);
TransactionTemplate template = new TransactionTemplate(tm);
assertThatExceptionOfType(Error.class)
.isThrownBy(() -> template.executeWithoutResult(status -> { throw new Error("Some application error"); }));
.isThrownBy(() -> template.executeWithoutResult(status -> { throw new Error("Some application error"); }));
assertThat(tm.begin).as("triggered begin").isTrue();
assertThat(tm.commit).as("no commit").isFalse();
assertThat(tm.rollback).as("triggered rollback").isTrue();
@ -265,12 +328,12 @@ class TransactionSupportTests {
assertThat(template3).isEqualTo(template2);
}
@Nested
class AbstractPlatformTransactionManagerConfigurationTests {
private final AbstractPlatformTransactionManager tm = new TestTransactionManager(false, true);
@Test
void setTransactionSynchronizationNameToUnsupportedValues() {
assertThatIllegalArgumentException().isThrownBy(() -> tm.setTransactionSynchronizationName(null));
@ -302,22 +365,20 @@ class TransactionSupportTests {
assertThat(tm.getTransactionSynchronization()).isEqualTo(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
}
private static Stream<String> streamSynchronizationConstants() {
return Arrays.stream(AbstractPlatformTransactionManager.class.getFields())
.filter(ReflectionUtils::isPublicStaticFinal)
.map(Field::getName)
.filter(name -> name.startsWith("SYNCHRONIZATION_"));
}
}
@Nested
class TransactionTemplateConfigurationTests {
private final TransactionTemplate template = new TransactionTemplate();
@Test
void setTransactionManager() {
TestTransactionManager tm = new TestTransactionManager(false, true);
@ -422,7 +483,6 @@ class TransactionSupportTests {
.filter(ReflectionUtils::isPublicStaticFinal)
.map(Field::getName);
}
}
}