diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java
index adbaf38106e..e30a3628314 100644
--- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java
+++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,8 +18,6 @@ package org.springframework.transaction;
import reactor.core.publisher.Mono;
-import org.springframework.lang.Nullable;
-
/**
* This is the central interface in Spring's reactive transaction infrastructure.
* Applications can use this directly, but it is not primarily meant as API:
@@ -27,6 +25,7 @@ import org.springframework.lang.Nullable;
* declarative transaction demarcation through AOP.
*
* @author Mark Paluch
+ * @author Juergen Hoeller
* @since 5.2
* @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean
*/
@@ -43,7 +42,7 @@ public interface ReactiveTransactionManager {
*
An exception to the above rule is the read-only flag, which should be
* ignored if no explicit read-only mode is supported. Essentially, the
* read-only flag is just a hint for potential optimization.
- * @param definition the TransactionDefinition instance (can be empty for defaults),
+ * @param definition the TransactionDefinition instance,
* describing propagation behavior, isolation level, timeout etc.
* @return transaction status object representing the new or current transaction
* @throws TransactionException in case of lookup, creation, or system errors
@@ -55,7 +54,7 @@ public interface ReactiveTransactionManager {
* @see TransactionDefinition#getTimeout
* @see TransactionDefinition#isReadOnly
*/
- Mono getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
+ Mono getTransaction(TransactionDefinition definition) throws TransactionException;
/**
* Commit the given transaction, with regard to its status. If the transaction
diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java
index 99c3f77394e..01d5f16ab06 100644
--- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java
+++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,8 +16,6 @@
package org.springframework.transaction;
-import reactor.core.publisher.Mono;
-
/**
* Representation of the status of a transaction exposing a reactive
* interface.
@@ -27,6 +25,7 @@ import reactor.core.publisher.Mono;
* an exception that causes an implicit rollback).
*
* @author Mark Paluch
+ * @author Juergen Hoeller
* @since 5.2
* @see #setRollbackOnly()
* @see ReactiveTransactionManager#getTransaction
@@ -45,10 +44,10 @@ public interface ReactiveTransactionStatus {
* that the only possible outcome of the transaction may be a rollback, as
* alternative to throwing an exception which would in turn trigger a rollback.
*
This is mainly intended for transactions managed by
- * {@link org.springframework.transaction.reactive.support.TransactionalOperator} or
- * {@link org.springframework.transaction.interceptor.ReactiveTransactionInterceptor},
+ * {@link org.springframework.transaction.reactive.TransactionalOperator} or
+ * {@link org.springframework.transaction.interceptor.TransactionInterceptor},
* where the actual commit/rollback decision is made by the container.
- * @see org.springframework.transaction.reactive.support.ReactiveTransactionCallback#doInTransaction
+ * @see org.springframework.transaction.reactive.ReactiveTransactionCallback#doInTransaction
* @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn
*/
void setRollbackOnly();
@@ -59,15 +58,6 @@ public interface ReactiveTransactionStatus {
*/
boolean isRollbackOnly();
- /**
- * Flush the underlying session to the datastore, if applicable.
- *
This is effectively just a hint and may be a no-op if the underlying
- * transaction manager does not have a flush concept. A flush signal may
- * get applied to the primary resource or to transaction synchronizations,
- * depending on the underlying resource.
- */
- Mono flush();
-
/**
* Return whether this transaction is completed, that is,
* whether it has already been committed or rolled back.
@@ -75,4 +65,5 @@ public interface ReactiveTransactionStatus {
* @see ReactiveTransactionManager#rollback
*/
boolean isCompleted();
+
}
diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java
index 526767192db..849c6bced5c 100644
--- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java
+++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java
@@ -19,7 +19,6 @@ package org.springframework.transaction.reactive;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
-import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,25 +29,20 @@ import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import org.springframework.core.Constants;
import org.springframework.lang.Nullable;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.InvalidTimeoutException;
-import org.springframework.transaction.NestedTransactionNotSupportedException;
+import org.springframework.transaction.ReactiveTransactionManager;
+import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSuspensionNotSupportedException;
import org.springframework.transaction.UnexpectedRollbackException;
-import org.springframework.transaction.ReactiveTransactionManager;
-import org.springframework.transaction.ReactiveTransactionStatus;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
-import org.springframework.util.Assert;
-
/**
* Abstract base class that implements Spring's standard reactive transaction workflow,
* serving as basis for concrete platform transaction managers.
+ *
*
This base class provides the following workflow handling:
*
*
determines if there is an existing transaction;
@@ -57,17 +51,19 @@ import org.springframework.util.Assert;
*
checks the rollback-only flag on commit;
*
applies the appropriate modification on rollback
* (actual rollback or setting rollback-only);
- *
Subclasses have to implement specific template methods for specific
* states of a transaction, e.g.: begin, suspend, resume, commit, rollback.
* The most important of them are abstract and must be provided by a concrete
* implementation; for the rest, defaults are provided, so overriding is optional.
+ *
*
Transaction synchronization is a generic mechanism for registering callbacks
* that get invoked at transaction completion time. This is mainly used internally
* by the data access support classes for R2DBC, MongoDB, etc. The same mechanism can
* also be leveraged for custom synchronization needs in an application.
+ *
*
The state of this class is serializable, to allow for serializing the
* transaction strategy along with proxies that carry a transaction interceptor.
* It is up to subclasses if they wish to make their state to be serializable too.
@@ -76,248 +72,15 @@ import org.springframework.util.Assert;
* to Java serialization rules) if they need to restore any transient state.
*
* @author Mark Paluch
+ * @author Juergen Hoeller
* @since 5.2
- * @see #setTransactionSynchronization
* @see ReactiveTransactionSynchronizationManager
*/
-@SuppressWarnings({"serial", "WeakerAccess"})
+@SuppressWarnings("serial")
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
- /**
- * Always activate transaction synchronization, even for "empty" transactions
- * that result from PROPAGATION_SUPPORTS with no existing backend transaction.
- *
- * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_SUPPORTS
- * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NOT_SUPPORTED
- * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NEVER
- */
- public static final int SYNCHRONIZATION_ALWAYS = 0;
-
- /**
- * Activate transaction synchronization only for actual transactions,
- * that is, not for empty ones that result from PROPAGATION_SUPPORTS with
- * no existing backend transaction.
- *
- * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRED
- * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_MANDATORY
- * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRES_NEW
- */
- public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;
-
- /**
- * Never active transaction synchronization, not even for actual transactions.
- */
- public static final int SYNCHRONIZATION_NEVER = 2;
-
-
- /**
- * Constants instance for AbstractReactiveTransactionManager.
- */
- private static final Constants constants = new Constants(AbstractReactiveTransactionManager.class);
-
-
protected transient Log logger = LogFactory.getLog(getClass());
- private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;
-
- private Duration defaultTimeout = Duration.ofSeconds(TransactionDefinition.TIMEOUT_DEFAULT);
-
- private boolean nestedTransactionAllowed = false;
-
- private boolean validateExistingTransaction = false;
-
- private boolean globalRollbackOnParticipationFailure = true;
-
- private boolean failEarlyOnGlobalRollbackOnly = false;
-
- private boolean rollbackOnCommitFailure = false;
-
-
- /**
- * Set the transaction synchronization by the name of the corresponding constant
- * in this class, e.g. "SYNCHRONIZATION_ALWAYS".
- * @param constantName name of the constant
- * @see #SYNCHRONIZATION_ALWAYS
- */
- public final void setTransactionSynchronizationName(String constantName) {
- setTransactionSynchronization(constants.asNumber(constantName).intValue());
- }
-
- /**
- * Set when this transaction manager should activate the subscriber context-bound
- * transaction synchronization support. Default is "always".
- *
Note that transaction synchronization isn't supported for
- * multiple concurrent transactions by different transaction managers.
- * Only one transaction manager is allowed to activate it at any time.
- * @see #SYNCHRONIZATION_ALWAYS
- * @see #SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
- * @see #SYNCHRONIZATION_NEVER
- * @see ReactiveTransactionSynchronizationManager
- * @see ReactiveTransactionSynchronization
- */
- public final void setTransactionSynchronization(int transactionSynchronization) {
- this.transactionSynchronization = transactionSynchronization;
- }
-
- /**
- * Return if this transaction manager should activate the subscriber context-bound
- * transaction synchronization support.
- */
- public final int getTransactionSynchronization() {
- return this.transactionSynchronization;
- }
-
- /**
- * Specify the default timeout that this transaction manager should apply
- * if there is no timeout specified at the transaction level, in seconds.
- *
Default is the underlying transaction infrastructure's default timeout,
- * e.g. typically 30 seconds in case of a JTA provider, indicated by the
- * {@code TransactionDefinition.TIMEOUT_DEFAULT} value.
- * @see org.springframework.transaction.TransactionDefinition#TIMEOUT_DEFAULT
- */
- public final void setDefaultTimeout(Duration defaultTimeout) {
- Assert.notNull(defaultTimeout, "Default timeout must not be null");
- if (defaultTimeout.getSeconds() < TransactionDefinition.TIMEOUT_DEFAULT) {
- throw new InvalidTimeoutException("Invalid default timeout", (int) defaultTimeout.getSeconds());
- }
- this.defaultTimeout = defaultTimeout;
- }
-
- /**
- * Return the default timeout that this transaction manager should apply
- * if there is no timeout specified at the transaction level, in seconds.
- *
Returns {@code TransactionDefinition.TIMEOUT_DEFAULT} to indicate
- * the underlying transaction infrastructure's default timeout.
- */
- public final Duration getDefaultTimeout() {
- return this.defaultTimeout;
- }
-
- /**
- * Set whether nested transactions are allowed. Default is "false".
- *
Typically initialized with an appropriate default by the
- * concrete transaction manager subclass.
- */
- public final void setNestedTransactionAllowed(boolean nestedTransactionAllowed) {
- this.nestedTransactionAllowed = nestedTransactionAllowed;
- }
-
- /**
- * Return whether nested transactions are allowed.
- */
- public final boolean isNestedTransactionAllowed() {
- return this.nestedTransactionAllowed;
- }
-
- /**
- * Set whether existing transactions should be validated before participating
- * in them.
- *
When participating in an existing transaction (e.g. with
- * PROPAGATION_REQUIRED or PROPAGATION_SUPPORTS encountering an existing
- * transaction), this outer transaction's characteristics will apply even
- * to the inner transaction scope. Validation will detect incompatible
- * isolation level and read-only settings on the inner transaction definition
- * and reject participation accordingly through throwing a corresponding exception.
- *
Default is "false", leniently ignoring inner transaction settings,
- * simply overriding them with the outer transaction's characteristics.
- * Switch this flag to "true" in order to enforce strict validation.
- */
- public final void setValidateExistingTransaction(boolean validateExistingTransaction) {
- this.validateExistingTransaction = validateExistingTransaction;
- }
-
- /**
- * Return whether existing transactions should be validated before participating
- * in them.
- */
- public final boolean isValidateExistingTransaction() {
- return this.validateExistingTransaction;
- }
-
- /**
- * Set whether to globally mark an existing transaction as rollback-only
- * after a participating transaction failed.
- *
Default is "true": If a participating transaction (e.g. with
- * PROPAGATION_REQUIRED or PROPAGATION_SUPPORTS encountering an existing
- * transaction) fails, the transaction will be globally marked as rollback-only.
- * The only possible outcome of such a transaction is a rollback: The
- * transaction originator cannot make the transaction commit anymore.
- *
Switch this to "false" to let the transaction originator make the rollback
- * decision. If a participating transaction fails with an exception, the caller
- * can still decide to continue with a different path within the transaction.
- * However, note that this will only work as long as all participating resources
- * are capable of continuing towards a transaction commit even after a data access
- * failure: This is generally not the case for a Hibernate Session, for example;
- * neither is it for a sequence of R2DBC insert/update/delete operations.
- *
Note:This flag only applies to an explicit rollback attempt for a
- * subtransaction, typically caused by an exception thrown by a data access operation
- * (where TransactionInterceptor will trigger a {@code ReactiveTransactionManager.rollback()}
- * call according to a rollback rule). If the flag is off, the caller can handle the exception
- * and decide on a rollback, independent of the rollback rules of the subtransaction.
- * This flag does, however, not apply to explicit {@code setRollbackOnly}
- * calls on a {@code TransactionStatus}, which will always cause an eventual
- * global rollback (as it might not throw an exception after the rollback-only call).
- *
The recommended solution for handling failure of a subtransaction
- * is a "nested transaction", where the global transaction can be rolled
- * back to a savepoint taken at the beginning of the subtransaction.
- * PROPAGATION_NESTED provides exactly those semantics; however, it will
- * only work when nested transaction support is available. This is the case
- * with DataSourceTransactionManager, but not with JtaTransactionManager.
- * @see #setNestedTransactionAllowed
- */
- public final void setGlobalRollbackOnParticipationFailure(boolean globalRollbackOnParticipationFailure) {
- this.globalRollbackOnParticipationFailure = globalRollbackOnParticipationFailure;
- }
-
- /**
- * Return whether to globally mark an existing transaction as rollback-only
- * after a participating transaction failed.
- */
- public final boolean isGlobalRollbackOnParticipationFailure() {
- return this.globalRollbackOnParticipationFailure;
- }
-
- /**
- * Set whether to fail early in case of the transaction being globally marked
- * as rollback-only.
- *
Default is "false", only causing an UnexpectedRollbackException at the
- * outermost transaction boundary. Switch this flag on to cause an
- * UnexpectedRollbackException as early as the global rollback-only marker
- * has been first detected, even from within an inner transaction boundary.
- * @see org.springframework.transaction.UnexpectedRollbackException
- */
- public final void setFailEarlyOnGlobalRollbackOnly(boolean failEarlyOnGlobalRollbackOnly) {
- this.failEarlyOnGlobalRollbackOnly = failEarlyOnGlobalRollbackOnly;
- }
-
- /**
- * Return whether to fail early in case of the transaction being globally marked
- * as rollback-only.
- */
- public final boolean isFailEarlyOnGlobalRollbackOnly() {
- return this.failEarlyOnGlobalRollbackOnly;
- }
-
- /**
- * Set whether {@code doRollback} should be performed on failure of the
- * {@code doCommit} call. Typically not necessary and thus to be avoided,
- * as it can potentially override the commit exception with a subsequent
- * rollback exception.
- *
Default is "false".
- * @see #doCommit
- * @see #doRollback
- */
- public final void setRollbackOnCommitFailure(boolean rollbackOnCommitFailure) {
- this.rollbackOnCommitFailure = rollbackOnCommitFailure;
- }
-
- /**
- * Return whether {@code doRollback} should be performed on failure of the
- * {@code doCommit} call.
- */
- public final boolean isRollbackOnCommitFailure() {
- return this.rollbackOnCommitFailure;
- }
//---------------------------------------------------------------------
// Implementation of ReactiveTransactionManager
@@ -332,15 +95,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* @see #doBegin
*/
@Override
- public final Mono getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
-
- if (definition == null) {
- // Use defaults if no transaction definition given.
- definition = new DefaultTransactionDefinition();
- }
-
- TransactionDefinition definitionToUse = definition;
-
+ public final Mono getTransaction(TransactionDefinition definition) throws TransactionException {
return ReactiveTransactionSynchronizationManager.currentTransaction()
.flatMap(synchronizationManager -> {
@@ -351,58 +106,52 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
- return handleExistingTransaction(synchronizationManager, definitionToUse, transaction, debugEnabled);
+ return handleExistingTransaction(synchronizationManager, definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
- if (definitionToUse.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
- return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", definitionToUse.getTimeout()));
+ if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
+ return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()));
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
- if (definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
+ if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
return Mono.error(new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'"));
- } else if (definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
- definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
- definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
+ }
+ else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
+ definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
+ definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
return TransactionContextManager.currentContext()
.map(ReactiveTransactionSynchronizationManager::new)
- .flatMap(nestedSynchronizationManager -> {
-
- return suspend(nestedSynchronizationManager, null)
- .map(Optional::of)
- .defaultIfEmpty(Optional.empty())
- .flatMap(suspendedResources -> {
-
- if (debugEnabled) {
- logger.debug("Creating new transaction with name [" + definitionToUse.getName() + "]: " + definitionToUse);
- }
-
- return Mono.defer(() -> {
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- DefaultReactiveTransactionStatus status = newTransactionStatus(
- nestedSynchronizationManager, definitionToUse, transaction, true,
- newSynchronization, debugEnabled, suspendedResources.orElse(null));
-
- return doBegin(nestedSynchronizationManager, transaction, definitionToUse)
- .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, definitionToUse))
- .thenReturn(status);
- }).onErrorResume(ErrorPredicates.RuntimeOrError, e -> {
- return resume(nestedSynchronizationManager, null, suspendedResources.orElse(null))
- .then(Mono.error(e));
- });
- });
- });
- } else {
+ .flatMap(nestedSynchronizationManager ->
+ suspend(nestedSynchronizationManager, null)
+ .map(Optional::of)
+ .defaultIfEmpty(Optional.empty())
+ .flatMap(suspendedResources -> {
+ if (debugEnabled) {
+ logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
+ }
+ return Mono.defer(() -> {
+ DefaultReactiveTransactionStatus status = newTransactionStatus(
+ nestedSynchronizationManager, definition, transaction, true,
+ debugEnabled, suspendedResources.orElse(null));
+ return doBegin(nestedSynchronizationManager, transaction, definition)
+ .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, definition))
+ .thenReturn(status);
+ }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
+ ex -> resume(nestedSynchronizationManager, null, suspendedResources.orElse(null))
+ .then(Mono.error(ex)));
+ }));
+ }
+ else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
- if (definitionToUse.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
+ if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
- "isolation level will effectively be ignored: " + definitionToUse);
+ "isolation level will effectively be ignored: " + definition);
}
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return Mono.just(prepareTransactionStatus(synchronizationManager, definitionToUse, null, true, newSynchronization, debugEnabled, null));
+ return Mono.just(prepareTransactionStatus(synchronizationManager, definition, null, true, debugEnabled, null));
}
});
}
@@ -411,8 +160,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* Create a TransactionStatus for an existing transaction.
*/
private Mono handleExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager,
- TransactionDefinition definition, Object transaction, boolean debugEnabled)
- throws TransactionException {
+ TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
return Mono.error(new IllegalTransactionStateException(
@@ -424,12 +172,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
logger.debug("Suspending current transaction");
}
Mono suspend = suspend(synchronizationManager, transaction);
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
-
return suspend.map(suspendedResources -> prepareTransactionStatus(synchronizationManager,
- definition, null, false, newSynchronization, debugEnabled, suspendedResources)) //
+ definition, null, false, debugEnabled, suspendedResources)) //
.switchIfEmpty(Mono.fromSupplier(() -> prepareTransactionStatus(synchronizationManager,
- definition, null, false, newSynchronization, debugEnabled, null)))
+ definition, null, false, debugEnabled, null)))
.cast(ReactiveTransactionStatus.class);
}
@@ -439,82 +185,46 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
definition.getName() + "]");
}
Mono suspendedResources = suspend(synchronizationManager, transaction);
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
-
return suspendedResources.flatMap(suspendedResourcesHolder -> {
-
DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager,
- definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
- return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> {
- prepareSynchronization(synchronizationManager, status, definition);
- }).thenReturn(status).
-
- onErrorResume(ErrorPredicates.RuntimeOrError, beginEx -> {
- return resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx));
- });
+ definition, transaction, true, debugEnabled, suspendedResources);
+ return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
+ prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
+ .onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx ->
+ resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx)));
});
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- if (!isNestedTransactionAllowed()) {
- return Mono.error(new NestedTransactionNotSupportedException(
- "Transaction manager does not allow nested transactions by default - " +
- "specify 'nestedTransactionAllowed' property with value 'true'"));
- }
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
-
// Nested transaction through nested begin and commit/rollback calls.
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager,
- definition, transaction, true, newSynchronization, debugEnabled, null);
-
- return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> {
- prepareSynchronization(synchronizationManager, status, definition);
- }).thenReturn(status);
+ definition, transaction, true, debugEnabled, null);
+ return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
+ prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status);
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
- if (isValidateExistingTransaction()) {
- if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
- Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
- if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
- Constants isoConstants = new Constants(TransactionDefinition.class);
- return Mono.error(new IllegalTransactionStateException("Participating transaction with definition [" +
- definition + "] specifies isolation level which is incompatible with existing transaction: " +
- (currentIsolationLevel != null ?
- isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
- "(unknown)")));
- }
- }
- if (!definition.isReadOnly()) {
- if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
- return Mono.error(new IllegalTransactionStateException("Participating transaction with definition [" +
- definition + "] is not marked as read-only but existing transaction is"));
- }
- }
- }
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, newSynchronization, debugEnabled, null));
+ return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, debugEnabled, null));
}
/**
* Create a new TransactionStatus for the given arguments,
* also initializing transaction synchronization as appropriate.
- *
* @see #newTransactionStatus
* @see #prepareTransactionStatus
*/
- protected final DefaultReactiveTransactionStatus prepareTransactionStatus(
- ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
- boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
+ private DefaultReactiveTransactionStatus prepareTransactionStatus(
+ ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
+ @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager,
- definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
+ definition, transaction, newTransaction, debug, suspendedResources);
prepareSynchronization(synchronizationManager, status, definition);
return status;
}
@@ -522,21 +232,21 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
/**
* Create a TransactionStatus instance for the given arguments.
*/
- protected DefaultReactiveTransactionStatus newTransactionStatus(
- ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
- boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
+ private DefaultReactiveTransactionStatus newTransactionStatus(
+ ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
+ @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
- boolean actualNewSynchronization = newSynchronization &&
- !synchronizationManager.isSynchronizationActive();
- return new DefaultReactiveTransactionStatus(
- transaction, newTransaction, actualNewSynchronization,
+ return new DefaultReactiveTransactionStatus(transaction, newTransaction,
+ !synchronizationManager.isSynchronizationActive(),
definition.isReadOnly(), debug, suspendedResources);
}
/**
* Initialize transaction synchronization as appropriate.
*/
- protected void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, TransactionDefinition definition) {
+ private void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager,
+ DefaultReactiveTransactionStatus status, TransactionDefinition definition) {
+
if (status.isNewSynchronization()) {
synchronizationManager.setActualTransactionActive(status.hasTransaction());
synchronizationManager.setCurrentTransactionIsolationLevel(
@@ -548,22 +258,6 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
}
}
- /**
- * Determine the actual timeout to use for the given definition.
- * Will fall back to this manager's default timeout if the
- * transaction definition doesn't specify a non-default value.
- * @param definition the transaction definition
- * @return the actual timeout to use
- * @see org.springframework.transaction.TransactionDefinition#getTimeout()
- * @see #setDefaultTimeout
- */
- protected Duration determineTimeout(TransactionDefinition definition) {
- if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
- return Duration.ofSeconds(definition.getTimeout());
- }
- return this.defaultTimeout;
- }
-
/**
* Suspend the given transaction. Suspends transaction synchronization first,
@@ -576,16 +270,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* @see #doSuspend
* @see #resume
*/
- protected final Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction) throws TransactionException {
+ private Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager,
+ @Nullable Object transaction) throws TransactionException {
+
if (synchronizationManager.isSynchronizationActive()) {
Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager);
-
return suspendedSynchronizations.flatMap(synchronizations -> {
-
- Mono> suspendedResources = transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty());
-
+ Mono> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty()));
return suspendedResources.map(it -> {
-
String name = synchronizationManager.getCurrentTransactionName();
synchronizationManager.setCurrentTransactionName(null);
boolean readOnly = synchronizationManager.isCurrentTransactionReadOnly();
@@ -596,13 +288,15 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
synchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive);
- }).onErrorResume(ErrorPredicates.RuntimeOrError, t -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class));
+ }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class));
});
- } else if (transaction != null) {
+ }
+ else if (transaction != null) {
// Transaction active but no synchronization active.
Mono> suspendedResources = doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty());
return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null)));
- } else {
+ }
+ else {
// Neither transaction nor synchronization active.
return Mono.empty();
}
@@ -619,7 +313,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* @see #doResume
* @see #suspend
*/
- protected final Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
+ private Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager,
+ @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
@@ -644,23 +339,23 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* Resume outer transaction after inner transaction begin failed.
*/
private Mono resumeAfterBeginException(ReactiveTransactionSynchronizationManager synchronizationManager,
- Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) {
+ Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) {
String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception";
- return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RuntimeOrError, t -> logger.error(exMessage, beginEx));
+ return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RUNTIME_OR_ERROR,
+ ex -> logger.error(exMessage, beginEx));
}
/**
* Suspend all current synchronizations and deactivate transaction
* synchronization for the current transaction context.
- *
* @param synchronizationManager the synchronization manager bound to the current transaction
* @return the List of suspended ReactiveTransactionSynchronization objects
*/
- private Mono> doSuspendSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager) {
- List suspendedSynchronizations =
- synchronizationManager.getSynchronizations();
+ private Mono> doSuspendSynchronization(
+ ReactiveTransactionSynchronizationManager synchronizationManager) {
+ List suspendedSynchronizations = synchronizationManager.getSynchronizations();
return Flux.fromIterable(suspendedSynchronizations)
.concatMap(ReactiveTransactionSynchronization::suspend)
.then(Mono.defer(() -> {
@@ -675,14 +370,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* @param synchronizationManager the synchronization manager bound to the current transaction
* @param suspendedSynchronizations a List of ReactiveTransactionSynchronization objects
*/
- private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, List suspendedSynchronizations) {
- synchronizationManager.initSynchronization();
+ private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager,
+ List suspendedSynchronizations) {
+ synchronizationManager.initSynchronization();
return Flux.fromIterable(suspendedSynchronizations)
- .concatMap(synchronization -> {
- return synchronization.resume()
- .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization));
- }).then();
+ .concatMap(synchronization -> synchronization.resume()
+ .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization))).then();
}
@@ -703,22 +397,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
}
return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> {
-
DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status;
- if (defStatus.isLocalRollbackOnly()) {
+ if (defStatus.isRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
- return processRollback(synchronizationManager, defStatus, false);
+ return processRollback(synchronizationManager, defStatus);
}
-
- if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
- }
- return processRollback(synchronizationManager, defStatus, true);
- }
-
return processCommit(synchronizationManager, defStatus);
});
}
@@ -730,90 +415,55 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* @param status object representing the transaction
* @throws TransactionException in case of commit failure
*/
- private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException {
+ private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager,
+ DefaultReactiveTransactionStatus status) throws TransactionException {
AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false);
- AtomicBoolean unexpectedRollback = new AtomicBoolean(false);
Mono