Remove legacy config options from AbstractReactiveTransactionManager

Includes general revision of reactive transaction sources.

See gh-22646
This commit is contained in:
Juergen Hoeller 2019-05-02 12:04:25 +02:00
parent beea83b9d2
commit b5e5e33078
19 changed files with 384 additions and 978 deletions

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -18,8 +18,6 @@ package org.springframework.transaction;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
/**
* This is the central interface in Spring's reactive transaction infrastructure.
* Applications can use this directly, but it is not primarily meant as API:
@ -27,6 +25,7 @@ import org.springframework.lang.Nullable;
* declarative transaction demarcation through AOP.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean
*/
@ -43,7 +42,7 @@ public interface ReactiveTransactionManager {
* <p>An exception to the above rule is the read-only flag, which should be
* ignored if no explicit read-only mode is supported. Essentially, the
* read-only flag is just a hint for potential optimization.
* @param definition the TransactionDefinition instance (can be empty for defaults),
* @param definition the TransactionDefinition instance,
* describing propagation behavior, isolation level, timeout etc.
* @return transaction status object representing the new or current transaction
* @throws TransactionException in case of lookup, creation, or system errors
@ -55,7 +54,7 @@ public interface ReactiveTransactionManager {
* @see TransactionDefinition#getTimeout
* @see TransactionDefinition#isReadOnly
*/
Mono<ReactiveTransactionStatus> getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
Mono<ReactiveTransactionStatus> getTransaction(TransactionDefinition definition) throws TransactionException;
/**
* Commit the given transaction, with regard to its status. If the transaction

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,8 +16,6 @@
package org.springframework.transaction;
import reactor.core.publisher.Mono;
/**
* Representation of the status of a transaction exposing a reactive
* interface.
@ -27,6 +25,7 @@ import reactor.core.publisher.Mono;
* an exception that causes an implicit rollback).
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see #setRollbackOnly()
* @see ReactiveTransactionManager#getTransaction
@ -45,10 +44,10 @@ public interface ReactiveTransactionStatus {
* that the only possible outcome of the transaction may be a rollback, as
* alternative to throwing an exception which would in turn trigger a rollback.
* <p>This is mainly intended for transactions managed by
* {@link org.springframework.transaction.reactive.support.TransactionalOperator} or
* {@link org.springframework.transaction.interceptor.ReactiveTransactionInterceptor},
* {@link org.springframework.transaction.reactive.TransactionalOperator} or
* {@link org.springframework.transaction.interceptor.TransactionInterceptor},
* where the actual commit/rollback decision is made by the container.
* @see org.springframework.transaction.reactive.support.ReactiveTransactionCallback#doInTransaction
* @see org.springframework.transaction.reactive.ReactiveTransactionCallback#doInTransaction
* @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn
*/
void setRollbackOnly();
@ -59,15 +58,6 @@ public interface ReactiveTransactionStatus {
*/
boolean isRollbackOnly();
/**
* Flush the underlying session to the datastore, if applicable.
* <p>This is effectively just a hint and may be a no-op if the underlying
* transaction manager does not have a flush concept. A flush signal may
* get applied to the primary resource or to transaction synchronizations,
* depending on the underlying resource.
*/
Mono<Void> flush();
/**
* Return whether this transaction is completed, that is,
* whether it has already been committed or rolled back.
@ -75,4 +65,5 @@ public interface ReactiveTransactionStatus {
* @see ReactiveTransactionManager#rollback
*/
boolean isCompleted();
}

View File

@ -16,8 +16,6 @@
package org.springframework.transaction.reactive;
import reactor.core.publisher.Mono;
import org.springframework.transaction.ReactiveTransactionStatus;
/**
@ -29,6 +27,7 @@ import org.springframework.transaction.ReactiveTransactionStatus;
* underlying transaction object, and no transaction synchronization mechanism.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see #setRollbackOnly()
* @see #isRollbackOnly()
@ -52,42 +51,14 @@ public abstract class AbstractReactiveTransactionStatus implements ReactiveTrans
this.rollbackOnly = true;
}
/**
* Determine the rollback-only flag via checking both the local rollback-only flag
* of this TransactionStatus and the global rollback-only flag of the underlying
* transaction, if any.
* @see #isLocalRollbackOnly()
* @see #isGlobalRollbackOnly()
*/
@Override
public boolean isRollbackOnly() {
return (isLocalRollbackOnly() || isGlobalRollbackOnly());
}
/**
* Determine the rollback-only flag via checking this ReactiveTransactionStatus.
* <p>Will only return "true" if the application called {@code setRollbackOnly}
* on this TransactionStatus object.
*/
public boolean isLocalRollbackOnly() {
return this.rollbackOnly;
}
/**
* Template method for determining the global rollback-only flag of the
* underlying transaction, if any.
* <p>This implementation always returns {@code false}.
*/
public boolean isGlobalRollbackOnly() {
return false;
}
/**
* This implementations is empty, considering flush as a no-op.
*/
@Override
public Mono<Void> flush() {
return Mono.empty();
public boolean isRollbackOnly() {
return this.rollbackOnly;
}
/**

View File

@ -21,8 +21,8 @@ import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.util.Assert;
/**
* Default implementation of the {@link ReactiveTransactionStatus}
* interface, used by {@link AbstractReactiveTransactionManager}. Based on the concept
* Default implementation of the {@link ReactiveTransactionStatus} interface,
* used by {@link AbstractReactiveTransactionManager}. Based on the concept
* of an underlying "transaction object".
*
* <p>Holds all status information that {@link AbstractReactiveTransactionManager}
@ -33,6 +33,7 @@ import org.springframework.util.Assert;
* implementations, in particular not for mock transaction managers in testing environments.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see AbstractReactiveTransactionManager
* @see #getTransaction
@ -72,6 +73,7 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio
public DefaultReactiveTransactionStatus(
@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;

View File

@ -21,12 +21,11 @@ import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.Assert;
/**
@ -34,26 +33,20 @@ import org.springframework.util.Assert;
* transaction exception handling.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
*/
@SuppressWarnings("serial")
class DefaultTransactionalOperator extends DefaultTransactionDefinition
implements TransactionalOperator {
final class DefaultTransactionalOperator implements TransactionalOperator {
private final Log logger = LogFactory.getLog(getClass());
private static final Log logger = LogFactory.getLog(DefaultTransactionalOperator.class);
private final ReactiveTransactionManager transactionManager;
/**
* Construct a new DefaultTransactionalOperator using the given transaction manager.
* @param transactionManager the transaction management strategy to be used
*/
DefaultTransactionalOperator(ReactiveTransactionManager transactionManager) {
Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null");
this.transactionManager = transactionManager;
}
private final TransactionDefinition transactionDefinition;
/**
* Construct a new TransactionTemplate using the given transaction manager,
@ -63,9 +56,10 @@ class DefaultTransactionalOperator extends DefaultTransactionDefinition
* default settings from. Local properties can still be set to change values.
*/
DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
super(transactionDefinition);
Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null");
Assert.notNull(transactionManager, "TransactionDefinition must not be null");
this.transactionManager = transactionManager;
this.transactionDefinition = transactionDefinition;
}
@ -76,29 +70,21 @@ class DefaultTransactionalOperator extends DefaultTransactionDefinition
return this.transactionManager;
}
@Override
public <T> Flux<T> execute(ReactiveTransactionCallback<T> action) throws TransactionException {
return TransactionContextManager.currentContext().flatMapMany(context -> {
Mono<ReactiveTransactionStatus> status = this.transactionManager.getTransaction(this);
Mono<ReactiveTransactionStatus> status = this.transactionManager.getTransaction(this.transactionDefinition);
return status.flatMapMany(it -> {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
Flux<Object> retVal = Flux.from(action.doInTransaction(it));
return retVal.onErrorResume(ex -> {
// Transactional code threw application exception -> rollback
return rollbackOnException(it, ex).then(Mono.error(ex));
}).materialize().flatMap(signal -> {
if (signal.isOnComplete()) {
return transactionManager.commit(it).materialize();
}
return Mono.just(signal);
return retVal.onErrorResume(ex -> rollbackOnException(it, ex).
then(Mono.error(ex))).materialize().flatMap(signal -> {
if (signal.isOnComplete()) {
return this.transactionManager.commit(it).materialize();
}
return Mono.just(signal);
}).<T>dematerialize();
});
})
@ -113,13 +99,9 @@ class DefaultTransactionalOperator extends DefaultTransactionDefinition
* @throws TransactionException in case of a rollback error
*/
private Mono<Void> rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException {
logger.debug("Initiating transaction rollback on application exception", ex);
return this.transactionManager.rollback(status).onErrorMap(ex2 -> {
logger.error("Application exception overridden by rollback exception", ex);
if (ex2 instanceof TransactionSystemException) {
((TransactionSystemException) ex2).initApplicationException(ex);
}
@ -128,10 +110,16 @@ class DefaultTransactionalOperator extends DefaultTransactionDefinition
);
}
@Override
public boolean equals(Object other) {
return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) ||
getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager())));
}
@Override
public int hashCode() {
return getTransactionManager().hashCode();
}
}

View File

@ -16,11 +16,10 @@
package org.springframework.transaction.reactive;
import org.springframework.transaction.support.ResourceHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import reactor.core.publisher.Mono;
import org.springframework.transaction.support.ResourceHolder;
/**
* {@link ReactiveTransactionSynchronization} implementation that manages a
* {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}.
@ -47,9 +46,11 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
* @param resourceHolder the ResourceHolder to manage
* @param resourceKey the key to bind the ResourceHolder for
* @param synchronizationManager the synchronization manager bound to the current transaction
* @see TransactionSynchronizationManager#bindResource
* @see ReactiveTransactionSynchronizationManager#bindResource
*/
public ReactiveResourceHolderSynchronization(H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) {
public ReactiveResourceHolderSynchronization(
H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) {
this.resourceHolder = resourceHolder;
this.resourceKey = resourceKey;
this.synchronizationManager = synchronizationManager;
@ -59,7 +60,7 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
@Override
public Mono<Void> suspend() {
if (this.holderActive) {
synchronizationManager.unbindResource(this.resourceKey);
this.synchronizationManager.unbindResource(this.resourceKey);
}
return Mono.empty();
}
@ -67,16 +68,11 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
@Override
public Mono<Void> resume() {
if (this.holderActive) {
synchronizationManager.bindResource(this.resourceKey, this.resourceHolder);
this.synchronizationManager.bindResource(this.resourceKey, this.resourceHolder);
}
return Mono.empty();
}
@Override
public Mono<Void> flush() {
return flushResource(this.resourceHolder);
}
@Override
public Mono<Void> beforeCommit(boolean readOnly) {
return Mono.empty();
@ -85,13 +81,12 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
@Override
public Mono<Void> beforeCompletion() {
if (shouldUnbindAtCompletion()) {
synchronizationManager.unbindResource(this.resourceKey);
this.synchronizationManager.unbindResource(this.resourceKey);
this.holderActive = false;
if (shouldReleaseBeforeCompletion()) {
return releaseResource(this.resourceHolder, this.resourceKey);
}
}
return Mono.empty();
}
@ -100,15 +95,12 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
if (!shouldReleaseBeforeCompletion()) {
return processResourceAfterCommit(this.resourceHolder);
}
return Mono.empty();
}
@Override
public Mono<Void> afterCompletion(int status) {
return Mono.defer(() -> {
Mono<Void> sync = Mono.empty();
if (shouldUnbindAtCompletion()) {
boolean releaseNecessary = false;
@ -116,20 +108,21 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
// The thread-bound resource holder might not be available anymore,
// since afterCompletion might get called from a different thread.
this.holderActive = false;
synchronizationManager.unbindResourceIfPossible(this.resourceKey);
this.synchronizationManager.unbindResourceIfPossible(this.resourceKey);
this.resourceHolder.unbound();
releaseNecessary = true;
} else {
}
else {
releaseNecessary = shouldReleaseAfterCompletion(this.resourceHolder);
}
if (releaseNecessary) {
sync = releaseResource(this.resourceHolder, this.resourceKey);
}
} else {
}
else {
// Probably a pre-bound resource...
sync = cleanupResource(this.resourceHolder, this.resourceKey, (status == STATUS_COMMITTED));
}
;
return sync.doFinally(s -> this.resourceHolder.reset());
});
}
@ -151,7 +144,6 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
* <p>Note that resources will only be released when they are
* unbound from the thread ({@link #shouldUnbindAtCompletion()}).
* <p>The default implementation returns {@code true}.
*
* @see #releaseResource
*/
protected boolean shouldReleaseBeforeCompletion() {
@ -163,27 +155,16 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
* transaction completion ({@code true}).
* <p>The default implementation returns {@code !shouldReleaseBeforeCompletion()},
* releasing after completion if no attempt was made before completion.
*
* @see #releaseResource
*/
protected boolean shouldReleaseAfterCompletion(H resourceHolder) {
return !shouldReleaseBeforeCompletion();
}
/**
* Flush callback for the given resource holder.
*
* @param resourceHolder the resource holder to flush
*/
protected Mono<Void> flushResource(H resourceHolder) {
return Mono.empty();
}
/**
* After-commit callback for the given resource holder.
* Only called when the resource hasn't been released yet
* ({@link #shouldReleaseBeforeCompletion()}).
*
* @param resourceHolder the resource holder to process
*/
protected Mono<Void> processResourceAfterCommit(H resourceHolder) {
@ -192,7 +173,6 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
/**
* Release the given resource (after it has been unbound from the thread).
*
* @param resourceHolder the resource holder to process
* @param resourceKey the key that the ResourceHolder was bound for
*/
@ -202,7 +182,6 @@ public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHo
/**
* Perform a cleanup on the given resource (which is left bound to the thread).
*
* @param resourceHolder the resource holder to process
* @param resourceKey the key that the ResourceHolder was bound for
* @param committed whether the transaction has committed ({@code true})

View File

@ -38,15 +38,10 @@ import org.springframework.transaction.ReactiveTransactionStatus;
public interface ReactiveTransactionCallback<T> {
/**
* Gets called by {@link TransactionalOperator#transactional} within a transactional context.
* Gets called by {@link TransactionalOperator} within a transactional context.
* Does not need to care about transactions itself, although it can retrieve and
* influence the status of the current transaction via the given status object,
* e.g. setting rollback-only.
* <p>Allows for returning a result object created within the transaction, i.e. a
* domain object or a collection of domain objects. A RuntimeException thrown by the
* callback is treated as application exception that enforces a rollback. Any such
* exception will be propagated to the caller of the template, unless there is a
* problem rolling back, in which case a TransactionException will be thrown.
* @param status associated transaction status
* @return a result publisher
* @see TransactionalOperator#transactional

View File

@ -18,8 +18,6 @@ package org.springframework.transaction.reactive;
import reactor.core.publisher.Mono;
import org.springframework.transaction.ReactiveTransactionStatus;
/**
* Interface for reactive transaction synchronization callbacks.
* Supported by {@link AbstractReactiveTransactionManager}.
@ -67,17 +65,8 @@ public interface ReactiveTransactionSynchronization {
return Mono.empty();
}
/**
* Flush the underlying session to the datastore, if applicable.
* @see ReactiveTransactionStatus#flush()
*/
default Mono<Void> flush() {
return Mono.empty();
}
/**
* Invoked before transaction commit (before "beforeCompletion").
* Can e.g. flush transactional O/R Mapping sessions to the database.
* <p>This callback does <i>not</i> mean that the transaction will actually be committed.
* A rollback decision can still occur after this method has been called. This callback
* is rather meant to perform work that's only relevant if a commit still has a chance

View File

@ -30,8 +30,6 @@ import reactor.core.publisher.Mono;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.lang.Nullable;
import org.springframework.transaction.NoTransactionException;
import org.springframework.transaction.support.ResourceHolder;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.util.Assert;
/**
@ -43,9 +41,9 @@ import org.springframework.util.Assert;
* to be removed before a new one can be set for the same key.
* Supports a list of transaction synchronizations if synchronization is active.
*
* <p>Resource management code should check for context-bound resources, e.g. database
* connections, via {@code getResource}. Such code is
* normally not supposed to bind resources to units of work, as this is the responsibility
* <p>Resource management code should check for context-bound resources, e.g.
* database connections, via {@code getResource}. Such code is normally not
* supposed to bind resources to units of work, as this is the responsibility
* of transaction managers. A further option is to lazily bind on first use if
* transaction synchronization is active, for performing transactions that span
* an arbitrary number of resources.
@ -61,16 +59,15 @@ import org.springframework.util.Assert;
* isn't active, there is either no current transaction, or the transaction manager
* doesn't support transaction synchronization.
*
* <p>Synchronization is for example used to always return the same resources
* within a transaction, e.g. a database connection for
* any given Connectionfactory or DatabaseFactory.
* <p>Synchronization is for example used to always return the same resources within
* a transaction, e.g. a database connection for any given connection factory.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see #isSynchronizationActive
* @see #registerSynchronization
* @see TransactionSynchronization
* @see AbstractReactiveTransactionManager#setTransactionSynchronization
* @see ReactiveTransactionSynchronization
*/
public class ReactiveTransactionSynchronizationManager {
@ -97,10 +94,8 @@ public class ReactiveTransactionSynchronizationManager {
/**
* Check if there is a resource for the given key bound to the current thread.
*
* @param key the key to check (usually the resource factory)
* @return if there is a value bound to the current thread
* @see ResourceTransactionManager#getResourceFactory()
*/
public boolean hasResource(Object key) {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
@ -110,11 +105,9 @@ public class ReactiveTransactionSynchronizationManager {
/**
* Retrieve a resource for the given key that is bound to the current thread.
*
* @param key the key to check (usually the resource factory)
* @return a value bound to the current thread (usually the active
* resource object), or {@code null} if none
* @see ResourceTransactionManager#getResourceFactory()
*/
@Nullable
public Object getResource(Object key) {
@ -122,7 +115,7 @@ public class ReactiveTransactionSynchronizationManager {
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" +
transactionContext.getName() + "]");
this.transactionContext.getName() + "]");
}
return value;
}
@ -132,64 +125,50 @@ public class ReactiveTransactionSynchronizationManager {
*/
@Nullable
private Object doGetResource(Object actualKey) {
Map<Object, Object> map = transactionContext.getResources();
Map<Object, Object> map = this.transactionContext.getResources();
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
value = null;
}
return value;
}
/**
* Bind the given resource for the given key to the current context.
*
* @param key the key to bind the value to (usually the resource factory)
* @param value the value to bind (usually the active resource object)
* @throws IllegalStateException if there is already a value bound to the context
* @see ResourceTransactionManager#getResourceFactory()
*/
public void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = transactionContext.getResources();
Map<Object, Object> map = this.transactionContext.getResources();
Object oldValue = map.put(actualKey, value);
// Transparently suppress a ResourceHolder that was marked as void...
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to context [" + transactionContext.getName() + "]");
actualKey + "] bound to context [" + this.transactionContext.getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to context [" +
transactionContext.getName() + "]");
this.transactionContext.getName() + "]");
}
}
/**
* Unbind a resource for the given key from the current context.
*
* @param key the key to unbind (usually the resource factory)
* @return the previously bound value (usually the active resource object)
* @throws IllegalStateException if there is no value bound to the context
* @see ResourceTransactionManager#getResourceFactory()
*/
public Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to context [" + transactionContext.getName() + "]");
"No value for key [" + actualKey + "] bound to context [" + this.transactionContext.getName() + "]");
}
return value;
}
/**
* Unbind a resource for the given key from the current context.
*
* @param key the key to unbind (usually the resource factory)
* @return the previously bound value, or {@code null} if none bound
*/
@ -204,19 +183,16 @@ public class ReactiveTransactionSynchronizationManager {
*/
@Nullable
private Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = transactionContext.getResources();
Map<Object, Object> map = this.transactionContext.getResources();
Object value = map.remove(actualKey);
// Transparently suppress a ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from context [" +
transactionContext.getName() + "]");
this.transactionContext.getName() + "]");
}
return value;
}
//-------------------------------------------------------------------------
// Management of transaction synchronizations
//-------------------------------------------------------------------------
@ -224,17 +200,15 @@ public class ReactiveTransactionSynchronizationManager {
/**
* Return if transaction synchronization is active for the current context.
* Can be called before register to avoid unnecessary instance creation.
*
* @see #registerSynchronization
*/
public boolean isSynchronizationActive() {
return (transactionContext.getSynchronizations() != null);
return (this.transactionContext.getSynchronizations() != null);
}
/**
* Activate transaction synchronization for the current context.
* Called by a transaction manager on transaction begin.
*
* @throws IllegalStateException if synchronization is already active
*/
public void initSynchronization() throws IllegalStateException {
@ -242,7 +216,7 @@ public class ReactiveTransactionSynchronizationManager {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
transactionContext.setSynchronizations(new LinkedHashSet<>());
this.transactionContext.setSynchronizations(new LinkedHashSet<>());
}
/**
@ -251,7 +225,6 @@ public class ReactiveTransactionSynchronizationManager {
* <p>Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
*
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
@ -260,22 +233,22 @@ public class ReactiveTransactionSynchronizationManager {
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
Set<ReactiveTransactionSynchronization> synchs = this.transactionContext.getSynchronizations();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
transactionContext.getSynchronizations().add(synchronization);
synchs.add(synchronization);
}
/**
* Return an unmodifiable snapshot list of all registered synchronizations
* for the current context.
*
* @return unmodifiable List of TransactionSynchronization instances
* @throws IllegalStateException if synchronization is not active
* @see TransactionSynchronization
* @see ReactiveTransactionSynchronization
*/
public List<ReactiveTransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<ReactiveTransactionSynchronization> synchs = transactionContext.getSynchronizations();
Set<ReactiveTransactionSynchronization> synchs = this.transactionContext.getSynchronizations();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
@ -284,7 +257,8 @@ public class ReactiveTransactionSynchronizationManager {
// might register further synchronizations.
if (synchs.isEmpty()) {
return Collections.emptyList();
} else {
}
else {
// Sort lazily here, not in registerSynchronization.
List<ReactiveTransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
AnnotationAwareOrderComparator.sort(sortedSynchs);
@ -295,7 +269,6 @@ public class ReactiveTransactionSynchronizationManager {
/**
* Deactivate transaction synchronization for the current context.
* Called by the transaction manager on transaction cleanup.
*
* @throws IllegalStateException if synchronization is not active
*/
public void clearSynchronization() throws IllegalStateException {
@ -303,9 +276,10 @@ public class ReactiveTransactionSynchronizationManager {
throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
}
logger.trace("Clearing transaction synchronization");
transactionContext.setSynchronizations(null);
this.transactionContext.setSynchronizations(null);
}
//-------------------------------------------------------------------------
// Exposure of transaction characteristics
//-------------------------------------------------------------------------
@ -313,59 +287,53 @@ public class ReactiveTransactionSynchronizationManager {
/**
* Expose the name of the current transaction, if any.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param name the name of the transaction, or {@code null} to reset it
* @see org.springframework.transaction.TransactionDefinition#getName()
*/
public void setCurrentTransactionName(@Nullable String name) {
transactionContext.setCurrentTransactionName(name);
this.transactionContext.setCurrentTransactionName(name);
}
/**
* Return the name of the current transaction, or {@code null} if none set.
* To be called by resource management code for optimizations per use case,
* for example to optimize fetch strategies for specific named transactions.
*
* @see org.springframework.transaction.TransactionDefinition#getName()
*/
@Nullable
public String getCurrentTransactionName() {
return transactionContext.getCurrentTransactionName();
return this.transactionContext.getCurrentTransactionName();
}
/**
* Expose a read-only flag for the current transaction.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param readOnly {@code true} to mark the current transaction
* as read-only; {@code false} to reset such a read-only marker
* @see org.springframework.transaction.TransactionDefinition#isReadOnly()
*/
public void setCurrentTransactionReadOnly(boolean readOnly) {
transactionContext.setCurrentTransactionReadOnly(readOnly);
this.transactionContext.setCurrentTransactionReadOnly(readOnly);
}
/**
* Return whether the current transaction is marked as read-only.
* To be called by resource management code when preparing a newly
* created resource (for example, a Hibernate Session).
* created resource.
* <p>Note that transaction synchronizations receive the read-only flag
* as argument for the {@code beforeCommit} callback, to be able
* to suppress change detection on commit. The present method is meant
* to be used for earlier read-only checks, for example to set the
* flush mode of a Hibernate Session to "FlushMode.NEVER" upfront.
*
* to be used for earlier read-only checks.
* @see org.springframework.transaction.TransactionDefinition#isReadOnly()
* @see TransactionSynchronization#beforeCommit(boolean)
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
*/
public boolean isCurrentTransactionReadOnly() {
return transactionContext.isCurrentTransactionReadOnly();
return this.transactionContext.isCurrentTransactionReadOnly();
}
/**
* Expose an isolation level for the current transaction.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param isolationLevel the isolation level to expose, according to the
* R2DBC Connection constants (equivalent to the corresponding Spring
* TransactionDefinition constants), or {@code null} to reset it
@ -376,14 +344,13 @@ public class ReactiveTransactionSynchronizationManager {
* @see org.springframework.transaction.TransactionDefinition#getIsolationLevel()
*/
public void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {
transactionContext.setCurrentTransactionIsolationLevel(isolationLevel);
this.transactionContext.setCurrentTransactionIsolationLevel(isolationLevel);
}
/**
* Return the isolation level for the current transaction, if any.
* To be called by resource management code when preparing a newly
* created resource (for example, a R2DBC Connection).
*
* @return the currently exposed isolation level, according to the
* R2DBC Connection constants (equivalent to the corresponding Spring
* TransactionDefinition constants), or {@code null} if none
@ -395,18 +362,17 @@ public class ReactiveTransactionSynchronizationManager {
*/
@Nullable
public Integer getCurrentTransactionIsolationLevel() {
return transactionContext.getCurrentTransactionIsolationLevel();
return this.transactionContext.getCurrentTransactionIsolationLevel();
}
/**
* Expose whether there currently is an actual transaction active.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param active {@code true} to mark the current context as being associated
* with an actual transaction; {@code false} to reset that marker
*/
public void setActualTransactionActive(boolean active) {
transactionContext.setActualTransactionActive(active);
this.transactionContext.setActualTransactionActive(active);
}
/**
@ -418,17 +384,15 @@ public class ReactiveTransactionSynchronizationManager {
* resource transaction; also on PROPAGATION_SUPPORTS) and an actual
* transaction being active (with backing resource transaction;
* on PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, etc).
*
* @see #isSynchronizationActive()
*/
public boolean isActualTransactionActive() {
return transactionContext.isActualTransactionActive();
return this.transactionContext.isActualTransactionActive();
}
/**
* Clear the entire transaction synchronization state:
* registered synchronizations as well as the various transaction characteristics.
*
* @see #clearSynchronization()
* @see #setCurrentTransactionName
* @see #setCurrentTransactionReadOnly
@ -436,11 +400,11 @@ public class ReactiveTransactionSynchronizationManager {
* @see #setActualTransactionActive
*/
public void clear() {
transactionContext.clear();
this.transactionContext.clear();
}
private Map<Object, Object> getResources() {
return transactionContext.getResources();
return this.transactionContext.getResources();
}
}

View File

@ -25,7 +25,6 @@ import reactor.core.publisher.Mono;
import org.springframework.aop.scope.ScopedObject;
import org.springframework.core.InfrastructureProxy;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@ -34,11 +33,12 @@ import org.springframework.util.ClassUtils;
* callback methods on all currently registered synchronizations.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see ReactiveTransactionSynchronization
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
*/
public abstract class ReactiveTransactionSynchronizationUtils {
abstract class ReactiveTransactionSynchronizationUtils {
private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class);
@ -66,49 +66,14 @@ public abstract class ReactiveTransactionSynchronizationUtils {
}
/**
* Trigger {@code flush} callbacks on all currently registered synchronizations.
* @throws RuntimeException if thrown by a {@code flush} callback
* @see ReactiveTransactionSynchronization#flush()
*/
public static Mono<Void> triggerFlush() {
return TransactionContextManager.currentContext().flatMapIterable(TransactionContext::getSynchronizations).concatMap(ReactiveTransactionSynchronization::flush).then();
}
/**
* Trigger {@code beforeCommit} callbacks on all currently registered synchronizations.
*
* @param readOnly whether the transaction is defined as read-only transaction
* @throws RuntimeException if thrown by a {@code beforeCommit} callback
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
*/
public static Mono<Void> triggerBeforeCommit(boolean readOnly) {
return TransactionContextManager.currentContext()
.map(TransactionContext::getSynchronizations)
.flatMap(it -> triggerBeforeCommit(it, readOnly)).then();
}
/**
* Actually invoke the {@code triggerBeforeCommit} methods of the
* given Spring ReactiveTransactionSynchronization objects.
*
* @param synchronizations a List of ReactiveTransactionSynchronization objects
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
*/
public static Mono<Void> triggerBeforeCommit(Collection<ReactiveTransactionSynchronization> synchronizations, boolean readOnly) {
return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly))
.then();
}
/**
* Trigger {@code beforeCompletion} callbacks on all currently registered synchronizations.
* @see ReactiveTransactionSynchronization#beforeCompletion()
*/
public static Mono<Void> triggerBeforeCompletion() {
return TransactionContextManager.currentContext()
.map(TransactionContext::getSynchronizations)
.flatMap(ReactiveTransactionSynchronizationUtils::triggerBeforeCompletion);
return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)).then();
}
/**
@ -118,29 +83,16 @@ public abstract class ReactiveTransactionSynchronizationUtils {
* @see ReactiveTransactionSynchronization#beforeCompletion()
*/
public static Mono<Void> triggerBeforeCompletion(Collection<ReactiveTransactionSynchronization> synchronizations) {
return Flux.fromIterable(synchronizations)
.concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> {
logger.error("TransactionSynchronization.beforeCompletion threw exception", t);
}).then();
}
/**
* Trigger {@code afterCommit} callbacks on all currently registered synchronizations.
* @throws RuntimeException if thrown by a {@code afterCommit} callback
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
* @see ReactiveTransactionSynchronization#afterCommit()
*/
public static Mono<Void> triggerAfterCommit() {
return TransactionContextManager.currentContext()
.flatMap(it -> invokeAfterCommit(it.getSynchronizations()));
.concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) ->
logger.error("TransactionSynchronization.beforeCompletion threw exception", t)).then();
}
/**
* Actually invoke the {@code afterCommit} methods of the
* given Spring ReactiveTransactionSynchronization objects.
* @param synchronizations a List of ReactiveTransactionSynchronization objects
* @see TransactionSynchronization#afterCommit()
* @see ReactiveTransactionSynchronization#afterCommit()
*/
public static Mono<Void> invokeAfterCommit(Collection<ReactiveTransactionSynchronization> synchronizations) {
return Flux.fromIterable(synchronizations)
@ -148,21 +100,6 @@ public abstract class ReactiveTransactionSynchronizationUtils {
.then();
}
/**
* Trigger {@code afterCompletion} callbacks on all currently registered synchronizations.
* @param completionStatus the completion status according to the
* constants in the ReactiveTransactionSynchronization interface
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
* @see ReactiveTransactionSynchronization#afterCompletion(int)
* @see ReactiveTransactionSynchronization#STATUS_COMMITTED
* @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
*/
public static Mono<Void> triggerAfterCompletion(int completionStatus) {
return TransactionContextManager.currentContext()
.flatMap(it -> invokeAfterCompletion(it.getSynchronizations(), completionStatus));
}
/**
* Actually invoke the {@code afterCompletion} methods of the
* given Spring ReactiveTransactionSynchronization objects.
@ -174,13 +111,11 @@ public abstract class ReactiveTransactionSynchronizationUtils {
* @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
*/
public static Mono<Void> invokeAfterCompletion(Collection<ReactiveTransactionSynchronization> synchronizations,
int completionStatus) {
public static Mono<Void> invokeAfterCompletion(
Collection<ReactiveTransactionSynchronization> synchronizations, int completionStatus) {
return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus))
.onErrorContinue((t, o) -> {
logger.error("TransactionSynchronization.afterCompletion threw exception", t);
}).then();
.onErrorContinue((t, o) -> logger.error("TransactionSynchronization.afterCompletion threw exception", t)).then();
}
@ -189,7 +124,7 @@ public abstract class ReactiveTransactionSynchronizationUtils {
*/
private static class ScopedProxyUnwrapper {
static Object unwrapIfNecessary(Object resource) {
public static Object unwrapIfNecessary(Object resource) {
if (resource instanceof ScopedObject) {
return ((ScopedObject) resource).getTargetObject();
}

View File

@ -31,17 +31,21 @@ import org.springframework.util.StringUtils;
* from the subscriber context.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see TransactionContextManager
* @see reactor.util.context.Context
*/
public class TransactionContext {
private final @Nullable TransactionContext parent;
private final UUID contextId = UUID.randomUUID();
private final Map<Object, Object> resources = new LinkedHashMap<>();
private @Nullable Set<ReactiveTransactionSynchronization> synchronizations;
@Nullable
private Set<ReactiveTransactionSynchronization> synchronizations;
private volatile @Nullable String currentTransactionName;
@ -51,8 +55,6 @@ public class TransactionContext {
private volatile boolean actualTransactionActive;
private final @Nullable TransactionContext parent;
TransactionContext() {
this(null);
@ -63,78 +65,76 @@ public class TransactionContext {
}
public void clear() {
synchronizations = null;
currentTransactionName = null;
currentTransactionReadOnly = false;
currentTransactionIsolationLevel = null;
actualTransactionActive = false;
@Nullable
public TransactionContext getParent() {
return this.parent;
}
public String getName() {
if (StringUtils.hasText(currentTransactionName)) {
return contextId + ": " + currentTransactionName;
if (StringUtils.hasText(this.currentTransactionName)) {
return this.contextId + ": " + this.currentTransactionName;
}
return contextId.toString();
return this.contextId.toString();
}
public UUID getContextId() {
return contextId;
return this.contextId;
}
public Map<Object, Object> getResources() {
return resources;
return this.resources;
}
@Nullable
public Set<ReactiveTransactionSynchronization> getSynchronizations() {
return synchronizations;
}
public void setSynchronizations(@org.springframework.lang.Nullable Set<ReactiveTransactionSynchronization> synchronizations) {
public void setSynchronizations(@Nullable Set<ReactiveTransactionSynchronization> synchronizations) {
this.synchronizations = synchronizations;
}
@Nullable
public String getCurrentTransactionName() {
return currentTransactionName;
public Set<ReactiveTransactionSynchronization> getSynchronizations() {
return this.synchronizations;
}
public void setCurrentTransactionName(@Nullable String currentTransactionName) {
this.currentTransactionName = currentTransactionName;
}
public boolean isCurrentTransactionReadOnly() {
return currentTransactionReadOnly;
@Nullable
public String getCurrentTransactionName() {
return this.currentTransactionName;
}
public void setCurrentTransactionReadOnly(boolean currentTransactionReadOnly) {
this.currentTransactionReadOnly = currentTransactionReadOnly;
}
@Nullable
public Integer getCurrentTransactionIsolationLevel() {
return currentTransactionIsolationLevel;
public boolean isCurrentTransactionReadOnly() {
return this.currentTransactionReadOnly;
}
public void setCurrentTransactionIsolationLevel(@Nullable Integer currentTransactionIsolationLevel) {
this.currentTransactionIsolationLevel = currentTransactionIsolationLevel;
}
public boolean isActualTransactionActive() {
return actualTransactionActive;
@Nullable
public Integer getCurrentTransactionIsolationLevel() {
return this.currentTransactionIsolationLevel;
}
public void setActualTransactionActive(boolean actualTransactionActive) {
this.actualTransactionActive = actualTransactionActive;
}
@Nullable
public TransactionContext getParent() {
return parent;
public boolean isActualTransactionActive() {
return this.actualTransactionActive;
}
public void clear() {
this.synchronizations = null;
this.currentTransactionName = null;
this.currentTransactionReadOnly = false;
this.currentTransactionIsolationLevel = null;
this.actualTransactionActive = false;
}
}

View File

@ -13,61 +13,65 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.Stack;
import java.util.Deque;
import org.springframework.transaction.NoTransactionException;
/**
* Mutable holder for reactive transaction {@link TransactionContext contexts}.
* This holder keeps references to individual {@link TransactionContext}s.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see TransactionContext
*/
class TransactionContextHolder {
final class TransactionContextHolder {
private final Stack<TransactionContext> transactionStack;
private final Deque<TransactionContext> transactionStack;
TransactionContextHolder(Stack<TransactionContext> transactionStack) {
TransactionContextHolder(Deque<TransactionContext> transactionStack) {
this.transactionStack = transactionStack;
}
/**
* Return the current {@link TransactionContext}.
* @return the current {@link TransactionContext}.
* @throws NoTransactionException if no transaction is ongoing.
* @throws NoTransactionException if no transaction is ongoing
*/
TransactionContext currentContext() {
TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek());
TransactionContext context = this.transactionStack.peek();
if (context == null) {
throw new NoTransactionException("No transaction in context");
}
return context;
}
/**
* Create a new {@link TransactionContext}.
* @return the new {@link TransactionContext}.
*/
TransactionContext createContext() {
TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek());
return (context == null ? transactionStack.push(new TransactionContext()) :
transactionStack.push(new TransactionContext(context)));
TransactionContext context = this.transactionStack.peek();
if (context != null) {
context = new TransactionContext(context);
}
else {
context = new TransactionContext();
}
this.transactionStack.push(context);
return context;
}
/**
* Check whether the holder has a {@link TransactionContext}.
* @return {@literal true} if a {@link TransactionContext} is associated.
* @return {@literal true} if a {@link TransactionContext} is associated
*/
boolean hasContext() {
return !transactionStack.isEmpty();
return !this.transactionStack.isEmpty();
}
}

View File

@ -16,7 +16,7 @@
package org.springframework.transaction.reactive;
import java.util.Stack;
import java.util.ArrayDeque;
import java.util.function.Function;
import reactor.core.publisher.Flux;
@ -27,9 +27,9 @@ import org.springframework.transaction.NoTransactionException;
/**
* Delegate to register and obtain transactional contexts.
* <p/>
* Typically used by components that intercept or orchestrate transactional flows such as AOP interceptors or
* transactional operators.
*
* <p>Typically used by components that intercept or orchestrate transactional flows
* such as AOP interceptors or transactional operators.
*
* @author Mark Paluch
* @since 5.2
@ -37,9 +37,8 @@ import org.springframework.transaction.NoTransactionException;
*/
public abstract class TransactionContextManager {
private TransactionContextManager() {
/* prevent instantiation */
}
private TransactionContextManager() {
}
/**
@ -51,14 +50,11 @@ public abstract class TransactionContextManager {
* or no context found in a holder
*/
public static Mono<TransactionContext> currentContext() throws NoTransactionException {
return Mono.subscriberContext().handle((ctx, sink) -> {
if (ctx.hasKey(TransactionContext.class)) {
sink.next(ctx.get(TransactionContext.class));
return;
}
if (ctx.hasKey(TransactionContextHolder.class)) {
TransactionContextHolder holder = ctx.get(TransactionContextHolder.class);
if (holder.hasContext()) {
@ -66,7 +62,6 @@ public abstract class TransactionContextManager {
return;
}
}
sink.error(new NoTransactionException("No transaction in context"));
});
}
@ -74,9 +69,9 @@ public abstract class TransactionContextManager {
/**
* Create a {@link TransactionContext} and register it in the subscriber {@link Context}.
* @return functional context registration.
* @throws IllegalStateException if a transaction context is already associated.
* @see Mono#subscriberContext(Function)
* @see Flux#subscriberContext(Function)
* @throws IllegalStateException if a transaction context is already associated.
*/
public static Function<Context, Context> createTransactionContext() {
return context -> context.put(TransactionContext.class, new TransactionContext());
@ -91,13 +86,10 @@ public abstract class TransactionContextManager {
*/
public static Function<Context, Context> getOrCreateContext() {
return context -> {
TransactionContextHolder holder = context.get(TransactionContextHolder.class);
if (holder.hasContext()) {
context.put(TransactionContext.class, holder.currentContext());
}
return context.put(TransactionContext.class, holder.createContext());
};
}
@ -111,11 +103,9 @@ public abstract class TransactionContextManager {
* @return functional context registration.
*/
public static Function<Context, Context> getOrCreateContextHolder() {
return context -> {
if (!context.hasKey(TransactionContextHolder.class)) {
return context.put(TransactionContextHolder.class, new TransactionContextHolder(new Stack<>()));
return context.put(TransactionContextHolder.class, new TransactionContextHolder(new ArrayDeque<>()));
}
return context;
};

View File

@ -19,9 +19,9 @@ package org.springframework.transaction.reactive;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.ReactiveTransactionManager;
/**
* Operator class that simplifies programmatic transaction demarcation and
@ -38,40 +38,24 @@ import org.springframework.transaction.ReactiveTransactionManager;
* application services utilizing this class, making calls to the low-level
* services via an inner-class callback object.
*
* <p>Can be used within a service implementation via direct instantiation with
* a transaction manager reference, or get prepared in an application context
* and passed to services as bean reference. Note: The transaction manager should
* always be configured as bean in the application context: in the first case given
* to the service directly, in the second case given to the prepared template.
*
* <p>Supports setting the propagation behavior and the isolation level by name,
* for convenient configuration in context definitions.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
*/
public interface TransactionalOperator {
/**
* Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager}.
* @param transactionManager the transaction management strategy to be used
* @return the transactional operator
*/
static TransactionalOperator create(ReactiveTransactionManager transactionManager){
return new DefaultTransactionalOperator(transactionManager);
}
/**
* Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager}
* and {@link TransactionDefinition}.
*
* @param transactionManager the transaction management strategy to be used
* @param transactionDefinition the transaction definition to apply.
* @param transactionDefinition the transaction definition to apply
* @return the transactional operator
*/
static TransactionalOperator create(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){
static TransactionalOperator create(
ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){
return new DefaultTransactionalOperator(transactionManager, transactionDefinition);
}

View File

@ -1,5 +1,5 @@
/**
* Support classes for the org.springframework.transaction.reactive package.
* Support classes for reactive transaction management.
* Provides an abstract base class for reactive transaction manager implementations,
* and a transactional operator plus callback for transaction demarcation.
*/

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -48,7 +48,6 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) {
this.existingTransaction = existingTransaction;
this.canCreateTransaction = canCreateTransaction;
setTransactionSynchronization(SYNCHRONIZATION_NEVER);
}
@ -59,7 +58,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
@Override
protected boolean isExistingTransaction(Object transaction) {
return existingTransaction;
return this.existingTransaction;
}
@Override
@ -96,4 +95,5 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
}
return Mono.fromRunnable(() -> this.rollbackOnly = true);
}
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -87,7 +87,8 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void commitWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext())
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete();
assertHasBegan(tm);
@ -99,7 +100,7 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void rollbackWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(null).flatMap(tm::rollback)
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -112,7 +113,8 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void rollbackOnlyWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit)
tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly)
.flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -125,7 +127,8 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void commitWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext())
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete();
assertHasNotBegan(tm);
@ -137,7 +140,7 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void rollbackWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(null).flatMap(tm::rollback)
tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -150,7 +153,7 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void rollbackOnlyWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit)
tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
@ -163,7 +166,7 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void transactionTemplate() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TransactionalOperator operator = TransactionalOperator.create(tm);
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Flux.just("Walter").as(operator::transactional)
.as(StepVerifier::create)
@ -179,7 +182,7 @@ public class ReactiveTransactionSupportUnitTests {
@Test
public void transactionTemplateWithException() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TransactionalOperator operator = TransactionalOperator.create(tm);
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
RuntimeException ex = new RuntimeException("Some application exception");
Mono.error(ex).as(operator::transactional)

View File

@ -21,6 +21,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import static org.junit.Assert.*;
/**
@ -32,57 +34,47 @@ public class TransactionalOperatorTests {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
@Test
public void commitWithMono() {
TransactionalOperator operator = TransactionalOperator.create(tm);
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Mono.just(true).as(operator::transactional)
.as(StepVerifier::create)
.expectNext(true)
.verifyComplete();
assertTrue(tm.commit);
assertFalse(tm.rollback);
}
@Test
public void rollbackWithMono() {
TransactionalOperator operator = TransactionalOperator.create(tm);
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Mono.error(new IllegalStateException()).as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalStateException.class);
assertFalse(tm.commit);
assertTrue(tm.rollback);
}
@Test
public void commitWithFlux() {
TransactionalOperator operator = TransactionalOperator.create(tm);
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Flux.just(true).as(operator::transactional)
.as(StepVerifier::create)
.expectNext(true)
.verifyComplete();
assertTrue(tm.commit);
assertFalse(tm.rollback);
}
@Test
public void rollbackWithFlux() {
TransactionalOperator operator = TransactionalOperator.create(tm);
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Flux.error(new IllegalStateException()).as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalStateException.class);
assertFalse(tm.commit);
assertTrue(tm.rollback);
}
}