|
|
|
@ -1,5 +1,5 @@
|
|
|
|
|
/*
|
|
|
|
|
* Copyright 2002-2019 the original author or authors.
|
|
|
|
|
* Copyright 2002-2020 the original author or authors.
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
@ -196,7 +196,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
|
|
|
|
|
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
|
|
|
|
|
.onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx ->
|
|
|
|
|
resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx)));
|
|
|
|
|
resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx)
|
|
|
|
|
.then(Mono.error(beginEx)));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -281,7 +282,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
if (synchronizationManager.isSynchronizationActive()) {
|
|
|
|
|
Mono<List<TransactionSynchronization>> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager);
|
|
|
|
|
return suspendedSynchronizations.flatMap(synchronizations -> {
|
|
|
|
|
Mono<Optional<Object>> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty()));
|
|
|
|
|
Mono<Optional<Object>> suspendedResources = (transaction != null ?
|
|
|
|
|
doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) :
|
|
|
|
|
Mono.just(Optional.empty()));
|
|
|
|
|
return suspendedResources.map(it -> {
|
|
|
|
|
String name = synchronizationManager.getCurrentTransactionName();
|
|
|
|
|
synchronizationManager.setCurrentTransactionName(null);
|
|
|
|
@ -293,12 +296,15 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
synchronizationManager.setActualTransactionActive(false);
|
|
|
|
|
return new SuspendedResourcesHolder(
|
|
|
|
|
it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive);
|
|
|
|
|
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class));
|
|
|
|
|
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
|
|
|
|
|
ex -> doResumeSynchronization(synchronizationManager, synchronizations)
|
|
|
|
|
.cast(SuspendedResourcesHolder.class));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
else if (transaction != null) {
|
|
|
|
|
// Transaction active but no synchronization active.
|
|
|
|
|
Mono<Optional<Object>> suspendedResources = doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty());
|
|
|
|
|
Mono<Optional<Object>> suspendedResources =
|
|
|
|
|
doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty());
|
|
|
|
|
return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null)));
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
@ -445,10 +451,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
// Eclipse compiler with regard to inferred generics.
|
|
|
|
|
Mono<Object> result = propagateException;
|
|
|
|
|
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
|
|
|
|
|
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException);
|
|
|
|
|
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
|
|
|
|
|
.then(propagateException);
|
|
|
|
|
}
|
|
|
|
|
else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) {
|
|
|
|
|
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException);
|
|
|
|
|
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
|
|
|
|
|
.then(propagateException);
|
|
|
|
|
}
|
|
|
|
|
else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
|
|
|
|
|
Mono<Void> mono;
|
|
|
|
@ -458,7 +466,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
else {
|
|
|
|
|
mono = Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)).then(propagateException);
|
|
|
|
|
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex))
|
|
|
|
|
.then(propagateException);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
@ -573,9 +582,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
if (status.isDebug()) {
|
|
|
|
|
logger.trace("Triggering beforeCommit synchronization");
|
|
|
|
|
}
|
|
|
|
|
return TransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly());
|
|
|
|
|
return TransactionSynchronizationUtils.triggerBeforeCommit(
|
|
|
|
|
synchronizationManager.getSynchronizations(), status.isReadOnly());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -593,7 +602,6 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
}
|
|
|
|
|
return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -611,7 +619,6 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
}
|
|
|
|
|
return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -639,10 +646,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
// Existing transaction that we participate in, controlled outside
|
|
|
|
|
// of the scope of this Spring transaction manager -> try to register
|
|
|
|
|
// an afterCompletion callback with the existing (JTA) transaction.
|
|
|
|
|
return registerAfterCompletionWithExistingTransaction(synchronizationManager, status.getTransaction(), synchronizations);
|
|
|
|
|
return registerAfterCompletionWithExistingTransaction(
|
|
|
|
|
synchronizationManager, status.getTransaction(), synchronizations);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -690,7 +697,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
logger.debug("Resuming suspended transaction after completion of inner transaction");
|
|
|
|
|
}
|
|
|
|
|
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
|
|
|
|
|
return cleanup.then(resume(synchronizationManager, transaction, (SuspendedResourcesHolder) status.getSuspendedResources()));
|
|
|
|
|
return cleanup.then(resume(synchronizationManager, transaction,
|
|
|
|
|
(SuspendedResourcesHolder) status.getSuspendedResources()));
|
|
|
|
|
}
|
|
|
|
|
return cleanup;
|
|
|
|
|
});
|
|
|
|
@ -716,14 +724,16 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
* returned transaction object.
|
|
|
|
|
* @param synchronizationManager the synchronization manager bound to the current transaction
|
|
|
|
|
* @return the current transaction object
|
|
|
|
|
* @throws org.springframework.transaction.CannotCreateTransactionException if transaction support is not available
|
|
|
|
|
* @throws org.springframework.transaction.CannotCreateTransactionException
|
|
|
|
|
* if transaction support is not available
|
|
|
|
|
* @throws TransactionException in case of lookup or system errors
|
|
|
|
|
* @see #doBegin
|
|
|
|
|
* @see #doCommit
|
|
|
|
|
* @see #doRollback
|
|
|
|
|
* @see GenericReactiveTransaction#getTransaction
|
|
|
|
|
*/
|
|
|
|
|
protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException;
|
|
|
|
|
protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager)
|
|
|
|
|
throws TransactionException;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Check if the given transaction object indicates an existing transaction
|
|
|
|
@ -775,7 +785,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
* @param transaction the transaction object returned by {@code doGetTransaction}
|
|
|
|
|
* @return an object that holds suspended resources
|
|
|
|
|
* (will be kept unexamined for passing it into doResume)
|
|
|
|
|
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if suspending is not supported by the transaction manager implementation
|
|
|
|
|
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
|
|
|
|
|
* if suspending is not supported by the transaction manager implementation
|
|
|
|
|
* @throws TransactionException in case of system errors
|
|
|
|
|
* @see #doResume
|
|
|
|
|
*/
|
|
|
|
@ -795,7 +806,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|
|
|
|
* @param transaction the transaction object returned by {@code doGetTransaction}
|
|
|
|
|
* @param suspendedResources the object that holds suspended resources,
|
|
|
|
|
* as returned by doSuspend
|
|
|
|
|
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if resuming is not supported by the transaction manager implementation
|
|
|
|
|
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
|
|
|
|
|
* if suspending is not supported by the transaction manager implementation
|
|
|
|
|
* @throws TransactionException in case of system errors
|
|
|
|
|
* @see #doSuspend
|
|
|
|
|
*/
|
|
|
|
|