Shorter class names for common reactive transaction API types
Introduces TransactionExecution base interface for TransactionStatus as well as ReactiveTransaction. Renames getTransaction method to getReactiveTransaction, allowing for combined implementations of PlatformTransactionManager and ReactiveTransactionManager. See gh-22646
This commit is contained in:
parent
b5e5e33078
commit
8dabb3e626
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.transaction;
|
||||
|
||||
/**
|
||||
* Representation of an ongoing reactive transaction.
|
||||
* This is currently a marker interface extending {@link TransactionExecution}
|
||||
* but may acquire further methods in a future revision.
|
||||
*
|
||||
* <p>Transactional code can use this to retrieve status information,
|
||||
* and to programmatically request a rollback (instead of throwing
|
||||
* an exception that causes an implicit rollback).
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see #setRollbackOnly()
|
||||
* @see ReactiveTransactionManager#getReactiveTransaction
|
||||
*/
|
||||
public interface ReactiveTransaction extends TransactionExecution {
|
||||
|
||||
}
|
||||
|
|
@ -32,7 +32,7 @@ import reactor.core.publisher.Mono;
|
|||
public interface ReactiveTransactionManager {
|
||||
|
||||
/**
|
||||
* Emit a currently active transaction or create a new one, according to
|
||||
* Emit a currently active reactive transaction or create a new one, according to
|
||||
* the specified propagation behavior.
|
||||
* <p>Note that parameters like isolation level or timeout will only be applied
|
||||
* to new transactions, and thus be ignored when participating in active ones.
|
||||
|
|
@ -54,7 +54,7 @@ public interface ReactiveTransactionManager {
|
|||
* @see TransactionDefinition#getTimeout
|
||||
* @see TransactionDefinition#isReadOnly
|
||||
*/
|
||||
Mono<ReactiveTransactionStatus> getTransaction(TransactionDefinition definition) throws TransactionException;
|
||||
Mono<ReactiveTransaction> getReactiveTransaction(TransactionDefinition definition) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Commit the given transaction, with regard to its status. If the transaction
|
||||
|
|
@ -72,7 +72,7 @@ public interface ReactiveTransactionManager {
|
|||
* database right before commit, with the resulting DataAccessException
|
||||
* causing the transaction to fail. The original exception will be
|
||||
* propagated to the caller of this commit method in such a case.
|
||||
* @param status object returned by the {@code getTransaction} method
|
||||
* @param transaction object returned by the {@code getTransaction} method
|
||||
* @throws UnexpectedRollbackException in case of an unexpected rollback
|
||||
* that the transaction coordinator initiated
|
||||
* @throws HeuristicCompletionException in case of a transaction failure
|
||||
|
|
@ -81,9 +81,9 @@ public interface ReactiveTransactionManager {
|
|||
* (typically caused by fundamental resource failures)
|
||||
* @throws IllegalTransactionStateException if the given transaction
|
||||
* is already completed (that is, committed or rolled back)
|
||||
* @see ReactiveTransactionStatus#setRollbackOnly
|
||||
* @see ReactiveTransaction#setRollbackOnly
|
||||
*/
|
||||
Mono<Void> commit(ReactiveTransactionStatus status) throws TransactionException;
|
||||
Mono<Void> commit(ReactiveTransaction transaction) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Perform a rollback of the given transaction.
|
||||
|
|
@ -95,12 +95,12 @@ public interface ReactiveTransactionManager {
|
|||
* The transaction will already have been completed and cleaned up when commit
|
||||
* returns, even in case of a commit exception. Consequently, a rollback call
|
||||
* after commit failure will lead to an IllegalTransactionStateException.
|
||||
* @param status object returned by the {@code getTransaction} method
|
||||
* @param transaction object returned by the {@code getTransaction} method
|
||||
* @throws TransactionSystemException in case of rollback or system errors
|
||||
* (typically caused by fundamental resource failures)
|
||||
* @throws IllegalTransactionStateException if the given transaction
|
||||
* is already completed (that is, committed or rolled back)
|
||||
*/
|
||||
Mono<Void> rollback(ReactiveTransactionStatus status) throws TransactionException;
|
||||
Mono<Void> rollback(ReactiveTransaction transaction) throws TransactionException;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,20 +17,14 @@
|
|||
package org.springframework.transaction;
|
||||
|
||||
/**
|
||||
* Representation of the status of a transaction exposing a reactive
|
||||
* interface.
|
||||
* Common representation of the current state of a transaction.
|
||||
* Serves as base interface for {@link TransactionStatus} as well as
|
||||
* {@link ReactiveTransaction}.
|
||||
*
|
||||
* <p>Transactional code can use this to retrieve status information,
|
||||
* and to programmatically request a rollback (instead of throwing
|
||||
* an exception that causes an implicit rollback).
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see #setRollbackOnly()
|
||||
* @see ReactiveTransactionManager#getTransaction
|
||||
*/
|
||||
public interface ReactiveTransactionStatus {
|
||||
public interface TransactionExecution {
|
||||
|
||||
/**
|
||||
* Return whether the present transaction is new; otherwise participating
|
||||
|
|
@ -43,12 +37,6 @@ public interface ReactiveTransactionStatus {
|
|||
* Set the transaction rollback-only. This instructs the transaction manager
|
||||
* that the only possible outcome of the transaction may be a rollback, as
|
||||
* alternative to throwing an exception which would in turn trigger a rollback.
|
||||
* <p>This is mainly intended for transactions managed by
|
||||
* {@link org.springframework.transaction.reactive.TransactionalOperator} or
|
||||
* {@link org.springframework.transaction.interceptor.TransactionInterceptor},
|
||||
* where the actual commit/rollback decision is made by the container.
|
||||
* @see org.springframework.transaction.reactive.ReactiveTransactionCallback#doInTransaction
|
||||
* @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn
|
||||
*/
|
||||
void setRollbackOnly();
|
||||
|
||||
|
|
@ -61,8 +49,6 @@ public interface ReactiveTransactionStatus {
|
|||
/**
|
||||
* Return whether this transaction is completed, that is,
|
||||
* whether it has already been committed or rolled back.
|
||||
* @see ReactiveTransactionManager#commit
|
||||
* @see ReactiveTransactionManager#rollback
|
||||
*/
|
||||
boolean isCompleted();
|
||||
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -36,14 +36,7 @@ import java.io.Flushable;
|
|||
* @see org.springframework.transaction.support.TransactionCallback#doInTransaction
|
||||
* @see org.springframework.transaction.interceptor.TransactionInterceptor#currentTransactionStatus()
|
||||
*/
|
||||
public interface TransactionStatus extends SavepointManager, Flushable {
|
||||
|
||||
/**
|
||||
* Return whether the present transaction is new; otherwise participating
|
||||
* in an existing transaction, or potentially not running in an actual
|
||||
* transaction in the first place.
|
||||
*/
|
||||
boolean isNewTransaction();
|
||||
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
|
||||
|
||||
/**
|
||||
* Return whether this transaction internally carries a savepoint,
|
||||
|
|
@ -58,25 +51,6 @@ public interface TransactionStatus extends SavepointManager, Flushable {
|
|||
*/
|
||||
boolean hasSavepoint();
|
||||
|
||||
/**
|
||||
* Set the transaction rollback-only. This instructs the transaction manager
|
||||
* that the only possible outcome of the transaction may be a rollback, as
|
||||
* alternative to throwing an exception which would in turn trigger a rollback.
|
||||
* <p>This is mainly intended for transactions managed by
|
||||
* {@link org.springframework.transaction.support.TransactionTemplate} or
|
||||
* {@link org.springframework.transaction.interceptor.TransactionInterceptor},
|
||||
* where the actual commit/rollback decision is made by the container.
|
||||
* @see org.springframework.transaction.support.TransactionCallback#doInTransaction
|
||||
* @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn
|
||||
*/
|
||||
void setRollbackOnly();
|
||||
|
||||
/**
|
||||
* Return whether the transaction has been marked as rollback-only
|
||||
* (either by the application or by the transaction infrastructure).
|
||||
*/
|
||||
boolean isRollbackOnly();
|
||||
|
||||
/**
|
||||
* Flush the underlying session to the datastore, if applicable:
|
||||
* for example, all affected Hibernate/JPA sessions.
|
||||
|
|
@ -88,12 +62,4 @@ public interface TransactionStatus extends SavepointManager, Flushable {
|
|||
@Override
|
||||
void flush();
|
||||
|
||||
/**
|
||||
* Return whether this transaction is completed, that is,
|
||||
* whether it has already been committed or rolled back.
|
||||
* @see PlatformTransactionManager#commit
|
||||
* @see PlatformTransactionManager#rollback
|
||||
*/
|
||||
boolean isCompleted();
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,8 +32,8 @@ import reactor.core.publisher.Mono;
|
|||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.transaction.IllegalTransactionStateException;
|
||||
import org.springframework.transaction.InvalidTimeoutException;
|
||||
import org.springframework.transaction.ReactiveTransaction;
|
||||
import org.springframework.transaction.ReactiveTransactionManager;
|
||||
import org.springframework.transaction.ReactiveTransactionStatus;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.TransactionException;
|
||||
import org.springframework.transaction.TransactionSuspensionNotSupportedException;
|
||||
|
|
@ -74,7 +74,7 @@ import org.springframework.transaction.UnexpectedRollbackException;
|
|||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see ReactiveTransactionSynchronizationManager
|
||||
* @see TransactionSynchronizationManager
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
|
||||
|
|
@ -95,8 +95,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @see #doBegin
|
||||
*/
|
||||
@Override
|
||||
public final Mono<ReactiveTransactionStatus> getTransaction(TransactionDefinition definition) throws TransactionException {
|
||||
return ReactiveTransactionSynchronizationManager.currentTransaction()
|
||||
public final Mono<ReactiveTransaction> getReactiveTransaction(TransactionDefinition definition) throws TransactionException {
|
||||
return TransactionSynchronizationManager.currentTransaction()
|
||||
.flatMap(synchronizationManager -> {
|
||||
|
||||
Object transaction = doGetTransaction(synchronizationManager);
|
||||
|
|
@ -124,7 +124,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
|
||||
|
||||
return TransactionContextManager.currentContext()
|
||||
.map(ReactiveTransactionSynchronizationManager::new)
|
||||
.map(TransactionSynchronizationManager::new)
|
||||
.flatMap(nestedSynchronizationManager ->
|
||||
suspend(nestedSynchronizationManager, null)
|
||||
.map(Optional::of)
|
||||
|
|
@ -134,7 +134,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
|
||||
}
|
||||
return Mono.defer(() -> {
|
||||
DefaultReactiveTransactionStatus status = newTransactionStatus(
|
||||
GenericReactiveTransaction status = newReactiveTransaction(
|
||||
nestedSynchronizationManager, definition, transaction, true,
|
||||
debugEnabled, suspendedResources.orElse(null));
|
||||
return doBegin(nestedSynchronizationManager, transaction, definition)
|
||||
|
|
@ -151,15 +151,15 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
|
||||
"isolation level will effectively be ignored: " + definition);
|
||||
}
|
||||
return Mono.just(prepareTransactionStatus(synchronizationManager, definition, null, true, debugEnabled, null));
|
||||
return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, null, true, debugEnabled, null));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a TransactionStatus for an existing transaction.
|
||||
* Create a ReactiveTransaction for an existing transaction.
|
||||
*/
|
||||
private Mono<ReactiveTransactionStatus> handleExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
private Mono<ReactiveTransaction> handleExistingTransaction(TransactionSynchronizationManager synchronizationManager,
|
||||
TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
|
||||
|
||||
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
|
||||
|
|
@ -172,11 +172,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
logger.debug("Suspending current transaction");
|
||||
}
|
||||
Mono<SuspendedResourcesHolder> suspend = suspend(synchronizationManager, transaction);
|
||||
return suspend.map(suspendedResources -> prepareTransactionStatus(synchronizationManager,
|
||||
return suspend.map(suspendedResources -> prepareReactiveTransaction(synchronizationManager,
|
||||
definition, null, false, debugEnabled, suspendedResources)) //
|
||||
.switchIfEmpty(Mono.fromSupplier(() -> prepareTransactionStatus(synchronizationManager,
|
||||
.switchIfEmpty(Mono.fromSupplier(() -> prepareReactiveTransaction(synchronizationManager,
|
||||
definition, null, false, debugEnabled, null)))
|
||||
.cast(ReactiveTransactionStatus.class);
|
||||
.cast(ReactiveTransaction.class);
|
||||
}
|
||||
|
||||
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
|
||||
|
|
@ -186,7 +186,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
}
|
||||
Mono<SuspendedResourcesHolder> suspendedResources = suspend(synchronizationManager, transaction);
|
||||
return suspendedResources.flatMap(suspendedResourcesHolder -> {
|
||||
DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager,
|
||||
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
|
||||
definition, transaction, true, debugEnabled, suspendedResources);
|
||||
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
|
||||
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
|
||||
|
|
@ -200,7 +200,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
|
||||
}
|
||||
// Nested transaction through nested begin and commit/rollback calls.
|
||||
DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager,
|
||||
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
|
||||
definition, transaction, true, debugEnabled, null);
|
||||
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
|
||||
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status);
|
||||
|
|
@ -210,33 +210,33 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
if (debugEnabled) {
|
||||
logger.debug("Participating in existing transaction");
|
||||
}
|
||||
return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, debugEnabled, null));
|
||||
return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, transaction, false, debugEnabled, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new TransactionStatus for the given arguments,
|
||||
* Create a new ReactiveTransaction for the given arguments,
|
||||
* also initializing transaction synchronization as appropriate.
|
||||
* @see #newTransactionStatus
|
||||
* @see #prepareTransactionStatus
|
||||
* @see #newReactiveTransaction
|
||||
* @see #prepareReactiveTransaction
|
||||
*/
|
||||
private DefaultReactiveTransactionStatus prepareTransactionStatus(
|
||||
ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
|
||||
private GenericReactiveTransaction prepareReactiveTransaction(
|
||||
TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
|
||||
@Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
|
||||
|
||||
DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager,
|
||||
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
|
||||
definition, transaction, newTransaction, debug, suspendedResources);
|
||||
prepareSynchronization(synchronizationManager, status, definition);
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a TransactionStatus instance for the given arguments.
|
||||
* Create a ReactiveTransaction instance for the given arguments.
|
||||
*/
|
||||
private DefaultReactiveTransactionStatus newTransactionStatus(
|
||||
ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
|
||||
private GenericReactiveTransaction newReactiveTransaction(
|
||||
TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
|
||||
@Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
|
||||
|
||||
return new DefaultReactiveTransactionStatus(transaction, newTransaction,
|
||||
return new GenericReactiveTransaction(transaction, newTransaction,
|
||||
!synchronizationManager.isSynchronizationActive(),
|
||||
definition.isReadOnly(), debug, suspendedResources);
|
||||
}
|
||||
|
|
@ -244,8 +244,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
/**
|
||||
* Initialize transaction synchronization as appropriate.
|
||||
*/
|
||||
private void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status, TransactionDefinition definition) {
|
||||
private void prepareSynchronization(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status, TransactionDefinition definition) {
|
||||
|
||||
if (status.isNewSynchronization()) {
|
||||
synchronizationManager.setActualTransactionActive(status.hasTransaction());
|
||||
|
|
@ -270,11 +270,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @see #doSuspend
|
||||
* @see #resume
|
||||
*/
|
||||
private Mono<SuspendedResourcesHolder> suspend(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
private Mono<SuspendedResourcesHolder> suspend(TransactionSynchronizationManager synchronizationManager,
|
||||
@Nullable Object transaction) throws TransactionException {
|
||||
|
||||
if (synchronizationManager.isSynchronizationActive()) {
|
||||
Mono<List<ReactiveTransactionSynchronization>> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager);
|
||||
Mono<List<TransactionSynchronization>> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager);
|
||||
return suspendedSynchronizations.flatMap(synchronizations -> {
|
||||
Mono<Optional<Object>> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty()));
|
||||
return suspendedResources.map(it -> {
|
||||
|
|
@ -313,7 +313,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @see #doResume
|
||||
* @see #suspend
|
||||
*/
|
||||
private Mono<Void> resume(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
private Mono<Void> resume(TransactionSynchronizationManager synchronizationManager,
|
||||
@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
|
||||
throws TransactionException {
|
||||
|
||||
|
|
@ -322,7 +322,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
if (suspendedResources != null) {
|
||||
return doResume(synchronizationManager, transaction, suspendedResources);
|
||||
}
|
||||
List<ReactiveTransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
|
||||
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
|
||||
if (suspendedSynchronizations != null) {
|
||||
synchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
|
||||
synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
|
||||
|
|
@ -338,7 +338,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
/**
|
||||
* Resume outer transaction after inner transaction begin failed.
|
||||
*/
|
||||
private Mono<Void> resumeAfterBeginException(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
private Mono<Void> resumeAfterBeginException(TransactionSynchronizationManager synchronizationManager,
|
||||
Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) {
|
||||
|
||||
String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception";
|
||||
|
|
@ -350,14 +350,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* Suspend all current synchronizations and deactivate transaction
|
||||
* synchronization for the current transaction context.
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @return the List of suspended ReactiveTransactionSynchronization objects
|
||||
* @return the List of suspended TransactionSynchronization objects
|
||||
*/
|
||||
private Mono<List<ReactiveTransactionSynchronization>> doSuspendSynchronization(
|
||||
ReactiveTransactionSynchronizationManager synchronizationManager) {
|
||||
private Mono<List<TransactionSynchronization>> doSuspendSynchronization(
|
||||
TransactionSynchronizationManager synchronizationManager) {
|
||||
|
||||
List<ReactiveTransactionSynchronization> suspendedSynchronizations = synchronizationManager.getSynchronizations();
|
||||
List<TransactionSynchronization> suspendedSynchronizations = synchronizationManager.getSynchronizations();
|
||||
return Flux.fromIterable(suspendedSynchronizations)
|
||||
.concatMap(ReactiveTransactionSynchronization::suspend)
|
||||
.concatMap(TransactionSynchronization::suspend)
|
||||
.then(Mono.defer(() -> {
|
||||
synchronizationManager.clearSynchronization();
|
||||
return Mono.just(suspendedSynchronizations);
|
||||
|
|
@ -368,10 +368,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* Reactivate transaction synchronization for the current transaction context
|
||||
* and resume all given synchronizations.
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param suspendedSynchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* @param suspendedSynchronizations a List of TransactionSynchronization objects
|
||||
*/
|
||||
private Mono<Void> doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
List<ReactiveTransactionSynchronization> suspendedSynchronizations) {
|
||||
private Mono<Void> doResumeSynchronization(TransactionSynchronizationManager synchronizationManager,
|
||||
List<TransactionSynchronization> suspendedSynchronizations) {
|
||||
|
||||
synchronizationManager.initSynchronization();
|
||||
return Flux.fromIterable(suspendedSynchronizations)
|
||||
|
|
@ -385,26 +385,26 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* transactions and programmatic rollback requests.
|
||||
* Delegates to {@code isRollbackOnly}, {@code doCommit}
|
||||
* and {@code rollback}.
|
||||
* @see ReactiveTransactionStatus#isRollbackOnly()
|
||||
* @see ReactiveTransaction#isRollbackOnly()
|
||||
* @see #doCommit
|
||||
* @see #rollback
|
||||
*/
|
||||
@Override
|
||||
public final Mono<Void> commit(ReactiveTransactionStatus status) throws TransactionException {
|
||||
if (status.isCompleted()) {
|
||||
public final Mono<Void> commit(ReactiveTransaction transaction) throws TransactionException {
|
||||
if (transaction.isCompleted()) {
|
||||
return Mono.error(new IllegalTransactionStateException(
|
||||
"Transaction is already completed - do not call commit or rollback more than once per transaction"));
|
||||
}
|
||||
|
||||
return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> {
|
||||
DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status;
|
||||
if (defStatus.isRollbackOnly()) {
|
||||
if (defStatus.isDebug()) {
|
||||
return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> {
|
||||
GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction;
|
||||
if (reactiveTx.isRollbackOnly()) {
|
||||
if (reactiveTx.isDebug()) {
|
||||
logger.debug("Transactional code has requested rollback");
|
||||
}
|
||||
return processRollback(synchronizationManager, defStatus);
|
||||
return processRollback(synchronizationManager, reactiveTx);
|
||||
}
|
||||
return processCommit(synchronizationManager, defStatus);
|
||||
return processCommit(synchronizationManager, reactiveTx);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -415,8 +415,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param status object representing the transaction
|
||||
* @throws TransactionException in case of commit failure
|
||||
*/
|
||||
private Mono<Void> processCommit(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) throws TransactionException {
|
||||
private Mono<Void> processCommit(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) throws TransactionException {
|
||||
|
||||
AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false);
|
||||
|
||||
|
|
@ -435,10 +435,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
})).then(Mono.empty().onErrorResume(ex -> {
|
||||
Mono<Object> propagateException = Mono.error(ex);
|
||||
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
|
||||
return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException);
|
||||
return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException);
|
||||
}
|
||||
if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) {
|
||||
triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN).then(propagateException);
|
||||
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException);
|
||||
}
|
||||
if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
|
||||
Mono<Void> mono;
|
||||
|
|
@ -453,8 +453,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
|
||||
return propagateException;
|
||||
})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
|
||||
triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
|
||||
.then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED))));
|
||||
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
|
||||
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));
|
||||
|
||||
return commit
|
||||
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status)
|
||||
|
|
@ -468,14 +468,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @see #doSetRollbackOnly
|
||||
*/
|
||||
@Override
|
||||
public final Mono<Void> rollback(ReactiveTransactionStatus status) throws TransactionException {
|
||||
if (status.isCompleted()) {
|
||||
public final Mono<Void> rollback(ReactiveTransaction transaction) throws TransactionException {
|
||||
if (transaction.isCompleted()) {
|
||||
return Mono.error(new IllegalTransactionStateException(
|
||||
"Transaction is already completed - do not call commit or rollback more than once per transaction"));
|
||||
}
|
||||
return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> {
|
||||
DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status;
|
||||
return processRollback(synchronizationManager, defStatus);
|
||||
return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> {
|
||||
GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction;
|
||||
return processRollback(synchronizationManager, reactiveTx);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -486,8 +486,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param status object representing the transaction
|
||||
* @throws TransactionException in case of rollback failure
|
||||
*/
|
||||
private Mono<Void> processRollback(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) {
|
||||
private Mono<Void> processRollback(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) {
|
||||
|
||||
return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> {
|
||||
if (status.isNewTransaction()) {
|
||||
|
|
@ -511,9 +511,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
return beforeCompletion;
|
||||
}
|
||||
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
|
||||
synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN)
|
||||
synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
|
||||
.then(Mono.error(ex)))
|
||||
.then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK)))
|
||||
.then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
|
||||
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
|
||||
.then(cleanupAfterCompletion(synchronizationManager, status));
|
||||
}
|
||||
|
|
@ -526,8 +526,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @throws TransactionException in case of rollback failure
|
||||
* @see #doRollback
|
||||
*/
|
||||
private Mono<Void> doRollbackOnCommitException(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status, Throwable ex) throws TransactionException {
|
||||
private Mono<Void> doRollbackOnCommitException(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status, Throwable ex) throws TransactionException {
|
||||
|
||||
return Mono.defer(() -> {
|
||||
if (status.isNewTransaction()) {
|
||||
|
|
@ -545,9 +545,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
return Mono.empty();
|
||||
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> {
|
||||
logger.error("Commit exception overridden by rollback exception", ex);
|
||||
return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN)
|
||||
return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
|
||||
.then(Mono.error(rbex));
|
||||
}).then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK));
|
||||
}).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK));
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -556,14 +556,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param status object representing the transaction
|
||||
*/
|
||||
private Mono<Void> triggerBeforeCommit(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) {
|
||||
private Mono<Void> triggerBeforeCommit(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) {
|
||||
|
||||
if (status.isNewSynchronization()) {
|
||||
if (status.isDebug()) {
|
||||
logger.trace("Triggering beforeCommit synchronization");
|
||||
}
|
||||
return ReactiveTransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly());
|
||||
return TransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly());
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
|
|
@ -574,14 +574,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param status object representing the transaction
|
||||
*/
|
||||
private Mono<Void> triggerBeforeCompletion(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) {
|
||||
private Mono<Void> triggerBeforeCompletion(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) {
|
||||
|
||||
if (status.isNewSynchronization()) {
|
||||
if (status.isDebug()) {
|
||||
logger.trace("Triggering beforeCompletion synchronization");
|
||||
}
|
||||
return ReactiveTransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations());
|
||||
return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations());
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
|
|
@ -592,14 +592,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param status object representing the transaction
|
||||
*/
|
||||
private Mono<Void> triggerAfterCommit(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) {
|
||||
private Mono<Void> triggerAfterCommit(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) {
|
||||
|
||||
if (status.isNewSynchronization()) {
|
||||
if (status.isDebug()) {
|
||||
logger.trace("Triggering afterCommit synchronization");
|
||||
}
|
||||
return ReactiveTransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations());
|
||||
return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations());
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
|
|
@ -609,13 +609,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* Trigger {@code afterCompletion} callbacks.
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param status object representing the transaction
|
||||
* @param completionStatus completion status according to ReactiveTransactionSynchronization constants
|
||||
* @param completionStatus completion status according to TransactionSynchronization constants
|
||||
*/
|
||||
private Mono<Void> triggerAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status, int completionStatus) {
|
||||
private Mono<Void> triggerAfterCompletion(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status, int completionStatus) {
|
||||
|
||||
if (status.isNewSynchronization()) {
|
||||
List<ReactiveTransactionSynchronization> synchronizations = synchronizationManager.getSynchronizations();
|
||||
List<TransactionSynchronization> synchronizations = synchronizationManager.getSynchronizations();
|
||||
synchronizationManager.clearSynchronization();
|
||||
if (!status.hasTransaction() || status.isNewTransaction()) {
|
||||
if (status.isDebug()) {
|
||||
|
|
@ -638,22 +638,22 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
|
||||
/**
|
||||
* Actually invoke the {@code afterCompletion} methods of the
|
||||
* given Spring ReactiveTransactionSynchronization objects.
|
||||
* given TransactionSynchronization objects.
|
||||
* <p>To be called by this abstract manager itself, or by special implementations
|
||||
* of the {@code registerAfterCompletionWithExistingTransaction} callback.
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param synchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* @param synchronizations a List of TransactionSynchronization objects
|
||||
* @param completionStatus the completion status according to the
|
||||
* constants in the ReactiveTransactionSynchronization interface
|
||||
* @see #registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager, Object, List)
|
||||
* @see ReactiveTransactionSynchronization#STATUS_COMMITTED
|
||||
* @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK
|
||||
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
|
||||
* constants in the TransactionSynchronization interface
|
||||
* @see #registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager, Object, List)
|
||||
* @see TransactionSynchronization#STATUS_COMMITTED
|
||||
* @see TransactionSynchronization#STATUS_ROLLED_BACK
|
||||
* @see TransactionSynchronization#STATUS_UNKNOWN
|
||||
*/
|
||||
private Mono<Void> invokeAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
List<ReactiveTransactionSynchronization> synchronizations, int completionStatus) {
|
||||
private Mono<Void> invokeAfterCompletion(TransactionSynchronizationManager synchronizationManager,
|
||||
List<TransactionSynchronization> synchronizations, int completionStatus) {
|
||||
|
||||
return ReactiveTransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
|
||||
return TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -663,8 +663,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param status object representing the transaction
|
||||
* @see #doCleanupAfterCompletion
|
||||
*/
|
||||
private Mono<Void> cleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) {
|
||||
private Mono<Void> cleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) {
|
||||
|
||||
return Mono.defer(() -> {
|
||||
status.setCompleted();
|
||||
|
|
@ -710,9 +710,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @see #doBegin
|
||||
* @see #doCommit
|
||||
* @see #doRollback
|
||||
* @see DefaultReactiveTransactionStatus#getTransaction
|
||||
* @see GenericReactiveTransaction#getTransaction
|
||||
*/
|
||||
protected abstract Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) throws TransactionException;
|
||||
protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Check if the given transaction object indicates an existing transaction
|
||||
|
|
@ -752,7 +752,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @throws org.springframework.transaction.NestedTransactionNotSupportedException
|
||||
* if the underlying transaction does not support nesting (e.g. through savepoints)
|
||||
*/
|
||||
protected abstract Mono<Void> doBegin(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
protected abstract Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager,
|
||||
Object transaction, TransactionDefinition definition) throws TransactionException;
|
||||
|
||||
/**
|
||||
|
|
@ -768,7 +768,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @throws TransactionException in case of system errors
|
||||
* @see #doResume
|
||||
*/
|
||||
protected Mono<Object> doSuspend(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager,
|
||||
Object transaction) throws TransactionException {
|
||||
|
||||
throw new TransactionSuspensionNotSupportedException(
|
||||
|
|
@ -788,7 +788,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @throws TransactionException in case of system errors
|
||||
* @see #doSuspend
|
||||
*/
|
||||
protected Mono<Void> doResume(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager,
|
||||
@Nullable Object transaction, Object suspendedResources) throws TransactionException {
|
||||
|
||||
throw new TransactionSuspensionNotSupportedException(
|
||||
|
|
@ -805,8 +805,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
|
||||
* (note: do not throw TransactionException subclasses here!)
|
||||
*/
|
||||
protected Mono<Void> prepareForCommit(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) {
|
||||
protected Mono<Void> prepareForCommit(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) {
|
||||
|
||||
return Mono.empty();
|
||||
}
|
||||
|
|
@ -820,10 +820,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param status the status representation of the transaction
|
||||
* @throws TransactionException in case of commit or system errors
|
||||
* @see DefaultReactiveTransactionStatus#getTransaction
|
||||
* @see GenericReactiveTransaction#getTransaction
|
||||
*/
|
||||
protected abstract Mono<Void> doCommit(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) throws TransactionException;
|
||||
protected abstract Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Perform an actual rollback of the given transaction.
|
||||
|
|
@ -833,10 +833,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param status the status representation of the transaction
|
||||
* @throws TransactionException in case of system errors
|
||||
* @see DefaultReactiveTransactionStatus#getTransaction
|
||||
* @see GenericReactiveTransaction#getTransaction
|
||||
*/
|
||||
protected abstract Mono<Void> doRollback(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) throws TransactionException;
|
||||
protected abstract Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Set the given transaction rollback-only. Only called on rollback
|
||||
|
|
@ -848,8 +848,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param status the status representation of the transaction
|
||||
* @throws TransactionException in case of system errors
|
||||
*/
|
||||
protected Mono<Void> doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
DefaultReactiveTransactionStatus status) throws TransactionException {
|
||||
protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager,
|
||||
GenericReactiveTransaction status) throws TransactionException {
|
||||
|
||||
throw new IllegalTransactionStateException(
|
||||
"Participating in existing transactions is not supported - when 'isExistingTransaction' " +
|
||||
|
|
@ -866,18 +866,18 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* chance to determine the actual outcome of the outer transaction.
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param transaction transaction object returned by {@code doGetTransaction}
|
||||
* @param synchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* @param synchronizations a List of TransactionSynchronization objects
|
||||
* @throws TransactionException in case of system errors
|
||||
* @see #invokeAfterCompletion(ReactiveTransactionSynchronizationManager, List, int)
|
||||
* @see ReactiveTransactionSynchronization#afterCompletion(int)
|
||||
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
|
||||
* @see #invokeAfterCompletion(TransactionSynchronizationManager, List, int)
|
||||
* @see TransactionSynchronization#afterCompletion(int)
|
||||
* @see TransactionSynchronization#STATUS_UNKNOWN
|
||||
*/
|
||||
protected Mono<Void> registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
Object transaction, List<ReactiveTransactionSynchronization> synchronizations) throws TransactionException {
|
||||
protected Mono<Void> registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager synchronizationManager,
|
||||
Object transaction, List<TransactionSynchronization> synchronizations) throws TransactionException {
|
||||
|
||||
logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " +
|
||||
"processing Spring after-completion callbacks immediately, with outcome status 'unknown'");
|
||||
return invokeAfterCompletion(synchronizationManager, synchronizations, ReactiveTransactionSynchronization.STATUS_UNKNOWN);
|
||||
return invokeAfterCompletion(synchronizationManager, synchronizations, TransactionSynchronization.STATUS_UNKNOWN);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -888,7 +888,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @param transaction transaction object returned by {@code doGetTransaction}
|
||||
*/
|
||||
protected Mono<Void> doCleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager,
|
||||
protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager,
|
||||
Object transaction) {
|
||||
|
||||
return Mono.empty();
|
||||
|
|
@ -918,7 +918,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
private final Object suspendedResources;
|
||||
|
||||
@Nullable
|
||||
private List<ReactiveTransactionSynchronization> suspendedSynchronizations;
|
||||
private List<TransactionSynchronization> suspendedSynchronizations;
|
||||
|
||||
@Nullable
|
||||
private String name;
|
||||
|
|
@ -935,7 +935,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
}
|
||||
|
||||
private SuspendedResourcesHolder(
|
||||
@Nullable Object suspendedResources, List<ReactiveTransactionSynchronization> suspendedSynchronizations,
|
||||
@Nullable Object suspendedResources, List<TransactionSynchronization> suspendedSynchronizations,
|
||||
@Nullable String name, boolean readOnly, @Nullable Integer isolationLevel, boolean wasActive) {
|
||||
|
||||
this.suspendedResources = suspendedResources;
|
||||
|
|
|
|||
|
|
@ -1,76 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.transaction.reactive;
|
||||
|
||||
import org.springframework.transaction.ReactiveTransactionStatus;
|
||||
|
||||
/**
|
||||
* Abstract base implementation of the {@link ReactiveTransactionStatus} interface.
|
||||
*
|
||||
* <p>Pre-implements the handling of local rollback-only and completed flags.
|
||||
*
|
||||
* <p>Does not assume any specific internal transaction handling, such as an
|
||||
* underlying transaction object, and no transaction synchronization mechanism.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see #setRollbackOnly()
|
||||
* @see #isRollbackOnly()
|
||||
* @see #setCompleted()
|
||||
* @see #isCompleted()
|
||||
* @see DefaultReactiveTransactionStatus
|
||||
*/
|
||||
public abstract class AbstractReactiveTransactionStatus implements ReactiveTransactionStatus {
|
||||
|
||||
private boolean rollbackOnly = false;
|
||||
|
||||
private boolean completed = false;
|
||||
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
// Handling of current transaction state
|
||||
//---------------------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public void setRollbackOnly() {
|
||||
this.rollbackOnly = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the rollback-only flag via checking this ReactiveTransactionStatus.
|
||||
* <p>Will only return "true" if the application called {@code setRollbackOnly}
|
||||
* on this TransactionStatus object.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRollbackOnly() {
|
||||
return this.rollbackOnly;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this transaction as completed, that is, committed or rolled back.
|
||||
*/
|
||||
public void setCompleted() {
|
||||
this.completed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCompleted() {
|
||||
return this.completed;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -17,11 +17,11 @@
|
|||
package org.springframework.transaction.reactive;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.transaction.ReactiveTransactionStatus;
|
||||
import org.springframework.transaction.ReactiveTransaction;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Default implementation of the {@link ReactiveTransactionStatus} interface,
|
||||
* Default implementation of the {@link ReactiveTransaction} interface,
|
||||
* used by {@link AbstractReactiveTransactionManager}. Based on the concept
|
||||
* of an underlying "transaction object".
|
||||
*
|
||||
|
|
@ -38,7 +38,7 @@ import org.springframework.util.Assert;
|
|||
* @see AbstractReactiveTransactionManager
|
||||
* @see #getTransaction
|
||||
*/
|
||||
public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactionStatus {
|
||||
public class GenericReactiveTransaction implements ReactiveTransaction {
|
||||
|
||||
@Nullable
|
||||
private final Object transaction;
|
||||
|
|
@ -54,6 +54,10 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio
|
|||
@Nullable
|
||||
private final Object suspendedResources;
|
||||
|
||||
private boolean rollbackOnly = false;
|
||||
|
||||
private boolean completed = false;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new {@code DefaultReactiveTransactionStatus} instance.
|
||||
|
|
@ -70,7 +74,7 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio
|
|||
* @param suspendedResources a holder for resources that have been suspended
|
||||
* for this transaction, if any
|
||||
*/
|
||||
public DefaultReactiveTransactionStatus(
|
||||
public GenericReactiveTransaction(
|
||||
@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
|
||||
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
|
||||
|
||||
|
|
@ -137,4 +141,31 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio
|
|||
return this.suspendedResources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRollbackOnly() {
|
||||
this.rollbackOnly = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the rollback-only flag via checking this ReactiveTransactionStatus.
|
||||
* <p>Will only return "true" if the application called {@code setRollbackOnly}
|
||||
* on this TransactionStatus object.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRollbackOnly() {
|
||||
return this.rollbackOnly;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this transaction as completed, that is, committed or rolled back.
|
||||
*/
|
||||
public void setCompleted() {
|
||||
this.completed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCompleted() {
|
||||
return this.completed;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -18,40 +18,38 @@ package org.springframework.transaction.reactive;
|
|||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.transaction.support.ResourceHolder;
|
||||
|
||||
/**
|
||||
* {@link ReactiveTransactionSynchronization} implementation that manages a
|
||||
* {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}.
|
||||
* {@link TransactionSynchronization} implementation that manages a
|
||||
* resource object bound through {@link TransactionSynchronizationManager}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @param <H> the resource holder type
|
||||
* @param <O> the resource holder type
|
||||
* @param <K> the resource key type
|
||||
*/
|
||||
public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHolder, K>
|
||||
implements ReactiveTransactionSynchronization {
|
||||
public abstract class ReactiveResourceSynchronization<O, K> implements TransactionSynchronization {
|
||||
|
||||
private final H resourceHolder;
|
||||
private final O resourceObject;
|
||||
|
||||
private final K resourceKey;
|
||||
|
||||
private final ReactiveTransactionSynchronizationManager synchronizationManager;
|
||||
private final TransactionSynchronizationManager synchronizationManager;
|
||||
|
||||
private volatile boolean holderActive = true;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new ResourceHolderSynchronization for the given holder.
|
||||
* @param resourceHolder the ResourceHolder to manage
|
||||
* @param resourceKey the key to bind the ResourceHolder for
|
||||
* Create a new ReactiveResourceSynchronization for the given holder.
|
||||
* @param resourceObject the resource object to manage
|
||||
* @param resourceKey the key to bind the resource object for
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @see ReactiveTransactionSynchronizationManager#bindResource
|
||||
* @see TransactionSynchronizationManager#bindResource
|
||||
*/
|
||||
public ReactiveResourceHolderSynchronization(
|
||||
H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) {
|
||||
public ReactiveResourceSynchronization(
|
||||
O resourceObject, K resourceKey, TransactionSynchronizationManager synchronizationManager) {
|
||||
|
||||
this.resourceHolder = resourceHolder;
|
||||
this.resourceObject = resourceObject;
|
||||
this.resourceKey = resourceKey;
|
||||
this.synchronizationManager = synchronizationManager;
|
||||
}
|
||||
|
|
@ -68,7 +66,7 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
|
|||
@Override
|
||||
public Mono<Void> resume() {
|
||||
if (this.holderActive) {
|
||||
this.synchronizationManager.bindResource(this.resourceKey, this.resourceHolder);
|
||||
this.synchronizationManager.bindResource(this.resourceKey, this.resourceObject);
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
|
|
@ -84,7 +82,7 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
|
|||
this.synchronizationManager.unbindResource(this.resourceKey);
|
||||
this.holderActive = false;
|
||||
if (shouldReleaseBeforeCompletion()) {
|
||||
return releaseResource(this.resourceHolder, this.resourceKey);
|
||||
return releaseResource(this.resourceObject, this.resourceKey);
|
||||
}
|
||||
}
|
||||
return Mono.empty();
|
||||
|
|
@ -93,7 +91,7 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
|
|||
@Override
|
||||
public Mono<Void> afterCommit() {
|
||||
if (!shouldReleaseBeforeCompletion()) {
|
||||
return processResourceAfterCommit(this.resourceHolder);
|
||||
return processResourceAfterCommit(this.resourceObject);
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
|
|
@ -109,21 +107,20 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
|
|||
// since afterCompletion might get called from a different thread.
|
||||
this.holderActive = false;
|
||||
this.synchronizationManager.unbindResourceIfPossible(this.resourceKey);
|
||||
this.resourceHolder.unbound();
|
||||
releaseNecessary = true;
|
||||
}
|
||||
else {
|
||||
releaseNecessary = shouldReleaseAfterCompletion(this.resourceHolder);
|
||||
releaseNecessary = shouldReleaseAfterCompletion(this.resourceObject);
|
||||
}
|
||||
if (releaseNecessary) {
|
||||
sync = releaseResource(this.resourceHolder, this.resourceKey);
|
||||
sync = releaseResource(this.resourceObject, this.resourceKey);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Probably a pre-bound resource...
|
||||
sync = cleanupResource(this.resourceHolder, this.resourceKey, (status == STATUS_COMMITTED));
|
||||
sync = cleanupResource(this.resourceObject, this.resourceKey, (status == STATUS_COMMITTED));
|
||||
}
|
||||
return sync.doFinally(s -> this.resourceHolder.reset());
|
||||
return sync;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -157,7 +154,7 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
|
|||
* releasing after completion if no attempt was made before completion.
|
||||
* @see #releaseResource
|
||||
*/
|
||||
protected boolean shouldReleaseAfterCompletion(H resourceHolder) {
|
||||
protected boolean shouldReleaseAfterCompletion(O resourceHolder) {
|
||||
return !shouldReleaseBeforeCompletion();
|
||||
}
|
||||
|
||||
|
|
@ -167,27 +164,27 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
|
|||
* ({@link #shouldReleaseBeforeCompletion()}).
|
||||
* @param resourceHolder the resource holder to process
|
||||
*/
|
||||
protected Mono<Void> processResourceAfterCommit(H resourceHolder) {
|
||||
protected Mono<Void> processResourceAfterCommit(O resourceHolder) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the given resource (after it has been unbound from the thread).
|
||||
* @param resourceHolder the resource holder to process
|
||||
* @param resourceKey the key that the ResourceHolder was bound for
|
||||
* @param resourceKey the key that the resource object was bound for
|
||||
*/
|
||||
protected Mono<Void> releaseResource(H resourceHolder, K resourceKey) {
|
||||
protected Mono<Void> releaseResource(O resourceHolder, K resourceKey) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a cleanup on the given resource (which is left bound to the thread).
|
||||
* @param resourceHolder the resource holder to process
|
||||
* @param resourceKey the key that the ResourceHolder was bound for
|
||||
* @param resourceKey the key that the resource object was bound for
|
||||
* @param committed whether the transaction has committed ({@code true})
|
||||
* or rolled back ({@code false})
|
||||
*/
|
||||
protected Mono<Void> cleanupResource(H resourceHolder, K resourceKey, boolean committed) {
|
||||
protected Mono<Void> cleanupResource(O resourceHolder, K resourceKey, boolean committed) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
|
@ -18,7 +18,7 @@ package org.springframework.transaction.reactive;
|
|||
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.transaction.ReactiveTransactionStatus;
|
||||
import org.springframework.transaction.ReactiveTransaction;
|
||||
|
||||
/**
|
||||
* Callback interface for reactive transactional code. Used with {@link TransactionalOperator}'s
|
||||
|
|
@ -30,12 +30,13 @@ import org.springframework.transaction.ReactiveTransactionStatus;
|
|||
* Spring's {@link org.springframework.transaction.annotation.Transactional} annotation).
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see TransactionalOperator
|
||||
* @param <T> the result type
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface ReactiveTransactionCallback<T> {
|
||||
public interface TransactionCallback<T> {
|
||||
|
||||
/**
|
||||
* Gets called by {@link TransactionalOperator} within a transactional context.
|
||||
|
|
@ -46,6 +47,6 @@ public interface ReactiveTransactionCallback<T> {
|
|||
* @return a result publisher
|
||||
* @see TransactionalOperator#transactional
|
||||
*/
|
||||
Publisher<T> doInTransaction(ReactiveTransactionStatus status);
|
||||
Publisher<T> doInTransaction(ReactiveTransaction status);
|
||||
|
||||
}
|
||||
|
|
@ -45,7 +45,7 @@ public class TransactionContext {
|
|||
private final Map<Object, Object> resources = new LinkedHashMap<>();
|
||||
|
||||
@Nullable
|
||||
private Set<ReactiveTransactionSynchronization> synchronizations;
|
||||
private Set<TransactionSynchronization> synchronizations;
|
||||
|
||||
private volatile @Nullable String currentTransactionName;
|
||||
|
||||
|
|
@ -85,12 +85,12 @@ public class TransactionContext {
|
|||
return this.resources;
|
||||
}
|
||||
|
||||
public void setSynchronizations(@Nullable Set<ReactiveTransactionSynchronization> synchronizations) {
|
||||
public void setSynchronizations(@Nullable Set<TransactionSynchronization> synchronizations) {
|
||||
this.synchronizations = synchronizations;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Set<ReactiveTransactionSynchronization> getSynchronizations() {
|
||||
public Set<TransactionSynchronization> getSynchronizations() {
|
||||
return this.synchronizations;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import org.springframework.transaction.NoTransactionException;
|
|||
*
|
||||
* @author Mark Paluch
|
||||
* @since 5.2
|
||||
* @see ReactiveTransactionSynchronization
|
||||
* @see TransactionSynchronization
|
||||
*/
|
||||
public abstract class TransactionContextManager {
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ public abstract class TransactionContextManager {
|
|||
/**
|
||||
* Return a {@link Function} to create or associate a new {@link TransactionContext}.
|
||||
* Interaction with transactional resources through
|
||||
* {@link ReactiveTransactionSynchronizationManager} requires a TransactionContext
|
||||
* {@link TransactionSynchronizationManager} requires a TransactionContext
|
||||
* to be registered in the subscriber context.
|
||||
* @return functional context registration.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import reactor.core.publisher.Mono;
|
|||
* Interface for reactive transaction synchronization callbacks.
|
||||
* Supported by {@link AbstractReactiveTransactionManager}.
|
||||
*
|
||||
* <p>ReactiveTransactionSynchronization implementations can implement the
|
||||
* <p>TransactionSynchronization implementations can implement the
|
||||
* {@link org.springframework.core.Ordered} interface to influence their execution order.
|
||||
* A synchronization that does not implement the {@link org.springframework.core.Ordered}
|
||||
* interface is appended to the end of the synchronization chain.
|
||||
|
|
@ -31,11 +31,12 @@ import reactor.core.publisher.Mono;
|
|||
* allowing for fine-grained interaction with their execution order (if necessary).
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see ReactiveTransactionSynchronizationManager
|
||||
* @see TransactionSynchronizationManager
|
||||
* @see AbstractReactiveTransactionManager
|
||||
*/
|
||||
public interface ReactiveTransactionSynchronization {
|
||||
public interface TransactionSynchronization {
|
||||
|
||||
/** Completion status in case of proper commit. */
|
||||
int STATUS_COMMITTED = 0;
|
||||
|
|
@ -50,7 +51,7 @@ public interface ReactiveTransactionSynchronization {
|
|||
/**
|
||||
* Suspend this synchronization.
|
||||
* Supposed to unbind resources from TransactionSynchronizationManager if managing any.
|
||||
* @see ReactiveTransactionSynchronizationManager#unbindResource
|
||||
* @see TransactionSynchronizationManager#unbindResource
|
||||
*/
|
||||
default Mono<Void> suspend() {
|
||||
return Mono.empty();
|
||||
|
|
@ -59,7 +60,7 @@ public interface ReactiveTransactionSynchronization {
|
|||
/**
|
||||
* Resume this synchronization.
|
||||
* Supposed to rebind resources to TransactionSynchronizationManager if managing any.
|
||||
* @see ReactiveTransactionSynchronizationManager#bindResource
|
||||
* @see TransactionSynchronizationManager#bindResource
|
||||
*/
|
||||
default Mono<Void> resume() {
|
||||
return Mono.empty();
|
||||
|
|
@ -67,29 +67,29 @@ import org.springframework.util.Assert;
|
|||
* @since 5.2
|
||||
* @see #isSynchronizationActive
|
||||
* @see #registerSynchronization
|
||||
* @see ReactiveTransactionSynchronization
|
||||
* @see TransactionSynchronization
|
||||
*/
|
||||
public class ReactiveTransactionSynchronizationManager {
|
||||
public class TransactionSynchronizationManager {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationManager.class);
|
||||
private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
|
||||
|
||||
private final TransactionContext transactionContext;
|
||||
|
||||
|
||||
public ReactiveTransactionSynchronizationManager(TransactionContext transactionContext) {
|
||||
public TransactionSynchronizationManager(TransactionContext transactionContext) {
|
||||
this.transactionContext = transactionContext;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the ReactiveTransactionSynchronizationManager of the current transaction.
|
||||
* Return the TransactionSynchronizationManager of the current transaction.
|
||||
* Mainly intended for code that wants to bind resources or synchronizations.
|
||||
* rollback-only but not throw an application exception.
|
||||
* @throws NoTransactionException if the transaction info cannot be found,
|
||||
* because the method was invoked outside a managed transaction.
|
||||
*/
|
||||
public static Mono<ReactiveTransactionSynchronizationManager> currentTransaction() {
|
||||
return TransactionContextManager.currentContext().map(ReactiveTransactionSynchronizationManager::new);
|
||||
public static Mono<TransactionSynchronizationManager> currentTransaction() {
|
||||
return TransactionContextManager.currentContext().map(TransactionSynchronizationManager::new);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,7 +98,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
* @return if there is a value bound to the current thread
|
||||
*/
|
||||
public boolean hasResource(Object key) {
|
||||
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object value = doGetResource(actualKey);
|
||||
return (value != null);
|
||||
}
|
||||
|
|
@ -111,7 +111,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
*/
|
||||
@Nullable
|
||||
public Object getResource(Object key) {
|
||||
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object value = doGetResource(actualKey);
|
||||
if (value != null && logger.isTraceEnabled()) {
|
||||
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" +
|
||||
|
|
@ -137,7 +137,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
* @throws IllegalStateException if there is already a value bound to the context
|
||||
*/
|
||||
public void bindResource(Object key, Object value) throws IllegalStateException {
|
||||
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Assert.notNull(value, "Value must not be null");
|
||||
Map<Object, Object> map = this.transactionContext.getResources();
|
||||
Object oldValue = map.put(actualKey, value);
|
||||
|
|
@ -158,7 +158,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
* @throws IllegalStateException if there is no value bound to the context
|
||||
*/
|
||||
public Object unbindResource(Object key) throws IllegalStateException {
|
||||
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object value = doUnbindResource(actualKey);
|
||||
if (value == null) {
|
||||
throw new IllegalStateException(
|
||||
|
|
@ -174,7 +174,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
*/
|
||||
@Nullable
|
||||
public Object unbindResourceIfPossible(Object key) {
|
||||
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
|
||||
return doUnbindResource(actualKey);
|
||||
}
|
||||
|
||||
|
|
@ -229,11 +229,11 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
* @throws IllegalStateException if transaction synchronization is not active
|
||||
* @see org.springframework.core.Ordered
|
||||
*/
|
||||
public void registerSynchronization(ReactiveTransactionSynchronization synchronization)
|
||||
public void registerSynchronization(TransactionSynchronization synchronization)
|
||||
throws IllegalStateException {
|
||||
|
||||
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
|
||||
Set<ReactiveTransactionSynchronization> synchs = this.transactionContext.getSynchronizations();
|
||||
Set<TransactionSynchronization> synchs = this.transactionContext.getSynchronizations();
|
||||
if (synchs == null) {
|
||||
throw new IllegalStateException("Transaction synchronization is not active");
|
||||
}
|
||||
|
|
@ -245,10 +245,10 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
* for the current context.
|
||||
* @return unmodifiable List of TransactionSynchronization instances
|
||||
* @throws IllegalStateException if synchronization is not active
|
||||
* @see ReactiveTransactionSynchronization
|
||||
* @see TransactionSynchronization
|
||||
*/
|
||||
public List<ReactiveTransactionSynchronization> getSynchronizations() throws IllegalStateException {
|
||||
Set<ReactiveTransactionSynchronization> synchs = this.transactionContext.getSynchronizations();
|
||||
public List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
|
||||
Set<TransactionSynchronization> synchs = this.transactionContext.getSynchronizations();
|
||||
if (synchs == null) {
|
||||
throw new IllegalStateException("Transaction synchronization is not active");
|
||||
}
|
||||
|
|
@ -260,7 +260,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
}
|
||||
else {
|
||||
// Sort lazily here, not in registerSynchronization.
|
||||
List<ReactiveTransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
|
||||
List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
|
||||
AnnotationAwareOrderComparator.sort(sortedSynchs);
|
||||
return Collections.unmodifiableList(sortedSynchs);
|
||||
}
|
||||
|
|
@ -325,7 +325,7 @@ public class ReactiveTransactionSynchronizationManager {
|
|||
* to suppress change detection on commit. The present method is meant
|
||||
* to be used for earlier read-only checks.
|
||||
* @see org.springframework.transaction.TransactionDefinition#isReadOnly()
|
||||
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
|
||||
* @see TransactionSynchronization#beforeCommit(boolean)
|
||||
*/
|
||||
public boolean isCurrentTransactionReadOnly() {
|
||||
return this.transactionContext.isCurrentTransactionReadOnly();
|
||||
|
|
@ -29,21 +29,21 @@ import org.springframework.util.Assert;
|
|||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* Utility methods for triggering specific {@link ReactiveTransactionSynchronization}
|
||||
* Utility methods for triggering specific {@link TransactionSynchronization}
|
||||
* callback methods on all currently registered synchronizations.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.2
|
||||
* @see ReactiveTransactionSynchronization
|
||||
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
|
||||
* @see TransactionSynchronization
|
||||
* @see TransactionSynchronizationManager#getSynchronizations()
|
||||
*/
|
||||
abstract class ReactiveTransactionSynchronizationUtils {
|
||||
abstract class TransactionSynchronizationUtils {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class);
|
||||
private static final Log logger = LogFactory.getLog(TransactionSynchronizationUtils.class);
|
||||
|
||||
private static final boolean aopAvailable = ClassUtils.isPresent(
|
||||
"org.springframework.aop.scope.ScopedObject", ReactiveTransactionSynchronizationUtils.class.getClassLoader());
|
||||
"org.springframework.aop.scope.ScopedObject", TransactionSynchronizationUtils.class.getClassLoader());
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -68,51 +68,51 @@ abstract class ReactiveTransactionSynchronizationUtils {
|
|||
|
||||
/**
|
||||
* Actually invoke the {@code triggerBeforeCommit} methods of the
|
||||
* given Spring ReactiveTransactionSynchronization objects.
|
||||
* @param synchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
|
||||
* given Spring TransactionSynchronization objects.
|
||||
* @param synchronizations a List of TransactionSynchronization objects
|
||||
* @see TransactionSynchronization#beforeCommit(boolean)
|
||||
*/
|
||||
public static Mono<Void> triggerBeforeCommit(Collection<ReactiveTransactionSynchronization> synchronizations, boolean readOnly) {
|
||||
public static Mono<Void> triggerBeforeCommit(Collection<TransactionSynchronization> synchronizations, boolean readOnly) {
|
||||
return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)).then();
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually invoke the {@code beforeCompletion} methods of the
|
||||
* given Spring ReactiveTransactionSynchronization objects.
|
||||
* @param synchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* @see ReactiveTransactionSynchronization#beforeCompletion()
|
||||
* given Spring TransactionSynchronization objects.
|
||||
* @param synchronizations a List of TransactionSynchronization objects
|
||||
* @see TransactionSynchronization#beforeCompletion()
|
||||
*/
|
||||
public static Mono<Void> triggerBeforeCompletion(Collection<ReactiveTransactionSynchronization> synchronizations) {
|
||||
public static Mono<Void> triggerBeforeCompletion(Collection<TransactionSynchronization> synchronizations) {
|
||||
return Flux.fromIterable(synchronizations)
|
||||
.concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) ->
|
||||
.concatMap(TransactionSynchronization::beforeCompletion).onErrorContinue((t, o) ->
|
||||
logger.error("TransactionSynchronization.beforeCompletion threw exception", t)).then();
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually invoke the {@code afterCommit} methods of the
|
||||
* given Spring ReactiveTransactionSynchronization objects.
|
||||
* @param synchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* @see ReactiveTransactionSynchronization#afterCommit()
|
||||
* given Spring TransactionSynchronization objects.
|
||||
* @param synchronizations a List of TransactionSynchronization objects
|
||||
* @see TransactionSynchronization#afterCommit()
|
||||
*/
|
||||
public static Mono<Void> invokeAfterCommit(Collection<ReactiveTransactionSynchronization> synchronizations) {
|
||||
public static Mono<Void> invokeAfterCommit(Collection<TransactionSynchronization> synchronizations) {
|
||||
return Flux.fromIterable(synchronizations)
|
||||
.concatMap(ReactiveTransactionSynchronization::afterCommit)
|
||||
.concatMap(TransactionSynchronization::afterCommit)
|
||||
.then();
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually invoke the {@code afterCompletion} methods of the
|
||||
* given Spring ReactiveTransactionSynchronization objects.
|
||||
* @param synchronizations a List of ReactiveTransactionSynchronization objects
|
||||
* given Spring TransactionSynchronization objects.
|
||||
* @param synchronizations a List of TransactionSynchronization objects
|
||||
* @param completionStatus the completion status according to the
|
||||
* constants in the ReactiveTransactionSynchronization interface
|
||||
* @see ReactiveTransactionSynchronization#afterCompletion(int)
|
||||
* @see ReactiveTransactionSynchronization#STATUS_COMMITTED
|
||||
* @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK
|
||||
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
|
||||
* constants in the TransactionSynchronization interface
|
||||
* @see TransactionSynchronization#afterCompletion(int)
|
||||
* @see TransactionSynchronization#STATUS_COMMITTED
|
||||
* @see TransactionSynchronization#STATUS_ROLLED_BACK
|
||||
* @see TransactionSynchronization#STATUS_UNKNOWN
|
||||
*/
|
||||
public static Mono<Void> invokeAfterCompletion(
|
||||
Collection<ReactiveTransactionSynchronization> synchronizations, int completionStatus) {
|
||||
Collection<TransactionSynchronization> synchronizations, int completionStatus) {
|
||||
|
||||
return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus))
|
||||
.onErrorContinue((t, o) -> logger.error("TransactionSynchronization.afterCompletion threw exception", t)).then();
|
||||
|
|
@ -56,7 +56,7 @@ public interface TransactionalOperator {
|
|||
static TransactionalOperator create(
|
||||
ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){
|
||||
|
||||
return new DefaultTransactionalOperator(transactionManager, transactionDefinition);
|
||||
return new TransactionalOperatorImpl(transactionManager, transactionDefinition);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -93,6 +93,6 @@ public interface TransactionalOperator {
|
|||
* @throws TransactionException in case of initialization, rollback, or system errors
|
||||
* @throws RuntimeException if thrown by the TransactionCallback
|
||||
*/
|
||||
<T> Flux<T> execute(ReactiveTransactionCallback<T> action) throws TransactionException;
|
||||
<T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,8 +21,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.transaction.ReactiveTransaction;
|
||||
import org.springframework.transaction.ReactiveTransactionManager;
|
||||
import org.springframework.transaction.ReactiveTransactionStatus;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.TransactionException;
|
||||
import org.springframework.transaction.TransactionSystemException;
|
||||
|
|
@ -39,9 +39,9 @@ import org.springframework.util.Assert;
|
|||
* @see ReactiveTransactionManager
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
final class DefaultTransactionalOperator implements TransactionalOperator {
|
||||
final class TransactionalOperatorImpl implements TransactionalOperator {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(DefaultTransactionalOperator.class);
|
||||
private static final Log logger = LogFactory.getLog(TransactionalOperatorImpl.class);
|
||||
|
||||
private final ReactiveTransactionManager transactionManager;
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ final class DefaultTransactionalOperator implements TransactionalOperator {
|
|||
* @param transactionDefinition the transaction definition to copy the
|
||||
* default settings from. Local properties can still be set to change values.
|
||||
*/
|
||||
DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
|
||||
TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
|
||||
Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null");
|
||||
Assert.notNull(transactionManager, "TransactionDefinition must not be null");
|
||||
this.transactionManager = transactionManager;
|
||||
|
|
@ -72,9 +72,9 @@ final class DefaultTransactionalOperator implements TransactionalOperator {
|
|||
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> execute(ReactiveTransactionCallback<T> action) throws TransactionException {
|
||||
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
|
||||
return TransactionContextManager.currentContext().flatMapMany(context -> {
|
||||
Mono<ReactiveTransactionStatus> status = this.transactionManager.getTransaction(this.transactionDefinition);
|
||||
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
|
||||
return status.flatMapMany(it -> {
|
||||
// This is an around advice: Invoke the next interceptor in the chain.
|
||||
// This will normally result in a target object being invoked.
|
||||
|
|
@ -98,7 +98,7 @@ final class DefaultTransactionalOperator implements TransactionalOperator {
|
|||
* @param ex the thrown application exception or error
|
||||
* @throws TransactionException in case of a rollback error
|
||||
*/
|
||||
private Mono<Void> rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException {
|
||||
private Mono<Void> rollbackOnException(ReactiveTransaction status, Throwable ex) throws TransactionException {
|
||||
logger.debug("Initiating transaction rollback on application exception", ex);
|
||||
return this.transactionManager.rollback(status).onErrorMap(ex2 -> {
|
||||
logger.error("Application exception overridden by rollback exception", ex);
|
||||
|
|
@ -113,8 +113,8 @@ final class DefaultTransactionalOperator implements TransactionalOperator {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) ||
|
||||
getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager())));
|
||||
return (this == other || (super.equals(other) && (!(other instanceof TransactionalOperatorImpl) ||
|
||||
getTransactionManager() == ((TransactionalOperatorImpl) other).getTransactionManager())));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -55,7 +55,7 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
|
|||
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
// Handling of current transaction state
|
||||
// Implementation of TransactionExecution
|
||||
//---------------------------------------------------------------------
|
||||
|
||||
@Override
|
||||
|
|
@ -93,13 +93,6 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This implementations is empty, considering flush as a no-op.
|
||||
*/
|
||||
@Override
|
||||
public void flush() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this transaction as completed, that is, committed or rolled back.
|
||||
*/
|
||||
|
|
@ -117,6 +110,11 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
|
|||
// Handling of current savepoint state
|
||||
//---------------------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public boolean hasSavepoint() {
|
||||
return (this.savepoint != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a savepoint for this transaction. Useful for PROPAGATION_NESTED.
|
||||
* @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NESTED
|
||||
|
|
@ -133,11 +131,6 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
|
|||
return this.savepoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasSavepoint() {
|
||||
return (this.savepoint != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a savepoint and hold it for the transaction.
|
||||
* @throws org.springframework.transaction.NestedTransactionNotSupportedException
|
||||
|
|
@ -223,4 +216,16 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
|
|||
throw new NestedTransactionNotSupportedException("This transaction does not support savepoints");
|
||||
}
|
||||
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
// Flushing support
|
||||
//---------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* This implementations is empty, considering flush as a no-op.
|
||||
*/
|
||||
@Override
|
||||
public void flush() {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -165,18 +165,6 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
|
|||
((SmartTransactionObject) this.transaction).isRollbackOnly());
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegate the flushing to the transaction object, provided that the latter
|
||||
* implements the {@link SmartTransactionObject} interface.
|
||||
* @see SmartTransactionObject#flush()
|
||||
*/
|
||||
@Override
|
||||
public void flush() {
|
||||
if (this.transaction instanceof SmartTransactionObject) {
|
||||
((SmartTransactionObject) this.transaction).flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This implementation exposes the {@link SavepointManager} interface
|
||||
* of the underlying transaction object, if any.
|
||||
|
|
@ -203,4 +191,16 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus {
|
|||
return (this.transaction instanceof SavepointManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegate the flushing to the transaction object, provided that the latter
|
||||
* implements the {@link SmartTransactionObject} interface.
|
||||
* @see SmartTransactionObject#flush()
|
||||
*/
|
||||
@Override
|
||||
public void flush() {
|
||||
if (this.transaction instanceof SmartTransactionObject) {
|
||||
((SmartTransactionObject) this.transaction).flush();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
|
||||
|
||||
@Override
|
||||
protected Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) {
|
||||
protected Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) {
|
||||
return TRANSACTION;
|
||||
}
|
||||
|
||||
|
|
@ -62,7 +62,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) {
|
||||
protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) {
|
||||
if (!TRANSACTION.equals(transaction)) {
|
||||
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
|
||||
}
|
||||
|
|
@ -73,7 +73,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) {
|
||||
protected Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) {
|
||||
if (!TRANSACTION.equals(status.getTransaction())) {
|
||||
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
|
||||
}
|
||||
|
|
@ -81,7 +81,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) {
|
||||
protected Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) {
|
||||
if (!TRANSACTION.equals(status.getTransaction())) {
|
||||
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
|
||||
}
|
||||
|
|
@ -89,7 +89,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) {
|
||||
protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) {
|
||||
if (!TRANSACTION.equals(status.getTransaction())) {
|
||||
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import reactor.test.StepVerifier;
|
|||
|
||||
import org.springframework.transaction.IllegalTransactionStateException;
|
||||
import org.springframework.transaction.ReactiveTransactionManager;
|
||||
import org.springframework.transaction.ReactiveTransactionStatus;
|
||||
import org.springframework.transaction.ReactiveTransaction;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.support.DefaultTransactionDefinition;
|
||||
|
||||
|
|
@ -34,27 +34,27 @@ import static org.junit.Assert.*;
|
|||
*
|
||||
* @author Mark Paluch
|
||||
*/
|
||||
public class ReactiveTransactionSupportUnitTests {
|
||||
public class ReactiveTransactionSupportTests {
|
||||
|
||||
@Test
|
||||
public void noExistingTransaction() {
|
||||
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true);
|
||||
|
||||
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
|
||||
.as(StepVerifier::create).consumeNextWith(actual -> {
|
||||
assertFalse(actual.hasTransaction());
|
||||
}).verifyComplete();
|
||||
|
||||
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
|
||||
.cast(DefaultReactiveTransactionStatus.class).subscriberContext(TransactionContextManager.createTransactionContext())
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
|
||||
.cast(GenericReactiveTransaction.class).subscriberContext(TransactionContextManager.createTransactionContext())
|
||||
.as(StepVerifier::create).consumeNextWith(actual -> {
|
||||
assertTrue(actual.hasTransaction());
|
||||
assertTrue(actual.isNewTransaction());
|
||||
}).verifyComplete();
|
||||
|
||||
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
|
||||
.as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify();
|
||||
}
|
||||
|
||||
|
|
@ -62,22 +62,22 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
public void existingTransaction() {
|
||||
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true);
|
||||
|
||||
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
|
||||
.as(StepVerifier::create).consumeNextWith(actual -> {
|
||||
assertNotNull(actual.getTransaction());
|
||||
assertFalse(actual.isNewTransaction());
|
||||
}).verifyComplete();
|
||||
|
||||
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
|
||||
.as(StepVerifier::create).consumeNextWith(actual -> {
|
||||
assertNotNull(actual.getTransaction());
|
||||
assertFalse(actual.isNewTransaction());
|
||||
}).verifyComplete();
|
||||
|
||||
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
|
||||
.as(StepVerifier::create).consumeNextWith(actual -> {
|
||||
assertNotNull(actual.getTransaction());
|
||||
assertFalse(actual.isNewTransaction());
|
||||
|
|
@ -87,7 +87,7 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
@Test
|
||||
public void commitWithoutExistingTransaction() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
|
||||
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext())
|
||||
.as(StepVerifier::create).verifyComplete();
|
||||
|
||||
|
|
@ -100,7 +100,7 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
@Test
|
||||
public void rollbackWithoutExistingTransaction() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
|
||||
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
|
||||
.verifyComplete();
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
@Test
|
||||
public void rollbackOnlyWithoutExistingTransaction() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
|
||||
tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly)
|
||||
.flatMap(tm::commit)
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
|
||||
.verifyComplete();
|
||||
|
|
@ -127,7 +127,7 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
@Test
|
||||
public void commitWithExistingTransaction() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
|
||||
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext())
|
||||
.as(StepVerifier::create).verifyComplete();
|
||||
|
||||
|
|
@ -140,7 +140,7 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
@Test
|
||||
public void rollbackWithExistingTransaction() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
|
||||
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
|
||||
.verifyComplete();
|
||||
|
||||
|
|
@ -153,7 +153,7 @@ public class ReactiveTransactionSupportUnitTests {
|
|||
@Test
|
||||
public void rollbackOnlyWithExistingTransaction() {
|
||||
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
|
||||
tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit)
|
||||
tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly).flatMap(tm::commit)
|
||||
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
|
||||
.verifyComplete();
|
||||
|
||||
Loading…
Reference in New Issue