Support for reactive transactions in TransactionInterceptor
Introduces TransactionManager marker interface for PlatformTransactionManager as well as ReactiveTransactionManager, allowing for a common configuration type in TransactionAspectSupport and TransactionManagementConfigurer. Closes gh-22590
This commit is contained in:
parent
8dabb3e626
commit
0be610b0ee
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -29,6 +29,7 @@ import org.springframework.beans.factory.ListableBeanFactory;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
import org.springframework.test.context.TestContext;
|
import org.springframework.test.context.TestContext;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
import org.springframework.transaction.TransactionManager;
|
||||||
import org.springframework.transaction.annotation.TransactionManagementConfigurer;
|
import org.springframework.transaction.annotation.TransactionManagementConfigurer;
|
||||||
import org.springframework.transaction.interceptor.DelegatingTransactionAttribute;
|
import org.springframework.transaction.interceptor.DelegatingTransactionAttribute;
|
||||||
import org.springframework.transaction.interceptor.TransactionAttribute;
|
import org.springframework.transaction.interceptor.TransactionAttribute;
|
||||||
|
@ -202,7 +203,14 @@ public abstract class TestContextTransactionUtils {
|
||||||
Assert.state(configurers.size() <= 1,
|
Assert.state(configurers.size() <= 1,
|
||||||
"Only one TransactionManagementConfigurer may exist in the ApplicationContext");
|
"Only one TransactionManagementConfigurer may exist in the ApplicationContext");
|
||||||
if (configurers.size() == 1) {
|
if (configurers.size() == 1) {
|
||||||
return configurers.values().iterator().next().annotationDrivenTransactionManager();
|
TransactionManager tm = configurers.values().iterator().next().annotationDrivenTransactionManager();
|
||||||
|
if (tm instanceof PlatformTransactionManager) {
|
||||||
|
return (PlatformTransactionManager) tm;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Specified transaction manager is not a PlatformTransactionManager: " + tm);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -43,7 +43,7 @@ import org.springframework.lang.Nullable;
|
||||||
* @see org.springframework.transaction.interceptor.TransactionInterceptor
|
* @see org.springframework.transaction.interceptor.TransactionInterceptor
|
||||||
* @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean
|
* @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean
|
||||||
*/
|
*/
|
||||||
public interface PlatformTransactionManager {
|
public interface PlatformTransactionManager extends TransactionManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a currently active transaction or create a new one, according to
|
* Return a currently active transaction or create a new one, according to
|
||||||
|
|
|
@ -27,9 +27,8 @@ import reactor.core.publisher.Mono;
|
||||||
* @author Mark Paluch
|
* @author Mark Paluch
|
||||||
* @author Juergen Hoeller
|
* @author Juergen Hoeller
|
||||||
* @since 5.2
|
* @since 5.2
|
||||||
* @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean
|
|
||||||
*/
|
*/
|
||||||
public interface ReactiveTransactionManager {
|
public interface ReactiveTransactionManager extends TransactionManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emit a currently active reactive transaction or create a new one, according to
|
* Emit a currently active reactive transaction or create a new one, according to
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2019 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.transaction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marker interface for Spring transaction manager implementations,
|
||||||
|
* either traditional or reactive.
|
||||||
|
*
|
||||||
|
* @author Juergen Hoeller
|
||||||
|
* @since 5.2
|
||||||
|
* @see PlatformTransactionManager
|
||||||
|
* @see ReactiveTransactionManager
|
||||||
|
*/
|
||||||
|
public interface TransactionManager {
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -27,7 +27,7 @@ import org.springframework.context.annotation.Role;
|
||||||
import org.springframework.core.annotation.AnnotationAttributes;
|
import org.springframework.core.annotation.AnnotationAttributes;
|
||||||
import org.springframework.core.type.AnnotationMetadata;
|
import org.springframework.core.type.AnnotationMetadata;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.TransactionManager;
|
||||||
import org.springframework.transaction.config.TransactionManagementConfigUtils;
|
import org.springframework.transaction.config.TransactionManagementConfigUtils;
|
||||||
import org.springframework.transaction.event.TransactionalEventListenerFactory;
|
import org.springframework.transaction.event.TransactionalEventListenerFactory;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
@ -51,7 +51,7 @@ public abstract class AbstractTransactionManagementConfiguration implements Impo
|
||||||
* Default transaction manager, as configured through a {@link TransactionManagementConfigurer}.
|
* Default transaction manager, as configured through a {@link TransactionManagementConfigurer}.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
protected PlatformTransactionManager txManager;
|
protected TransactionManager txManager;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2016 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -16,15 +16,16 @@
|
||||||
|
|
||||||
package org.springframework.transaction.annotation;
|
package org.springframework.transaction.annotation;
|
||||||
|
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.TransactionManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface to be implemented by @{@link org.springframework.context.annotation.Configuration
|
* Interface to be implemented by @{@link org.springframework.context.annotation.Configuration
|
||||||
* Configuration} classes annotated with @{@link EnableTransactionManagement} that wish to
|
* Configuration} classes annotated with @{@link EnableTransactionManagement} that wish to
|
||||||
* or need to explicitly specify the default {@link PlatformTransactionManager} bean to be
|
* (or need to) explicitly specify the default {@code PlatformTransactionManager} bean
|
||||||
* used for annotation-driven transaction management, as opposed to the default approach
|
* (or {@code ReactiveTransactionManager} bean) to be used for annotation-driven
|
||||||
* of a by-type lookup. One reason this might be necessary is if there are two
|
* transaction management, as opposed to the default approach of a by-type lookup.
|
||||||
* {@code PlatformTransactionManager} beans present in the container.
|
* One reason this might be necessary is if there are two {@code PlatformTransactionManager}
|
||||||
|
* beans present in the container.
|
||||||
*
|
*
|
||||||
* <p>See @{@link EnableTransactionManagement} for general examples and context;
|
* <p>See @{@link EnableTransactionManagement} for general examples and context;
|
||||||
* see {@link #annotationDrivenTransactionManager()} for detailed instructions.
|
* see {@link #annotationDrivenTransactionManager()} for detailed instructions.
|
||||||
|
@ -40,6 +41,8 @@ import org.springframework.transaction.PlatformTransactionManager;
|
||||||
* @since 3.1
|
* @since 3.1
|
||||||
* @see EnableTransactionManagement
|
* @see EnableTransactionManagement
|
||||||
* @see org.springframework.context.annotation.Primary
|
* @see org.springframework.context.annotation.Primary
|
||||||
|
* @see org.springframework.transaction.PlatformTransactionManager
|
||||||
|
* @see org.springframework.transaction.ReactiveTransactionManager
|
||||||
*/
|
*/
|
||||||
public interface TransactionManagementConfigurer {
|
public interface TransactionManagementConfigurer {
|
||||||
|
|
||||||
|
@ -76,7 +79,9 @@ public interface TransactionManagementConfigurer {
|
||||||
* container as all {@code PlatformTransactionManager} implementations take advantage
|
* container as all {@code PlatformTransactionManager} implementations take advantage
|
||||||
* of Spring lifecycle callbacks such as {@code InitializingBean} and
|
* of Spring lifecycle callbacks such as {@code InitializingBean} and
|
||||||
* {@code BeanFactoryAware}.
|
* {@code BeanFactoryAware}.
|
||||||
|
* @return a {@link org.springframework.transaction.PlatformTransactionManager} or
|
||||||
|
* {@link org.springframework.transaction.ReactiveTransactionManager} implementation
|
||||||
*/
|
*/
|
||||||
PlatformTransactionManager annotationDrivenTransactionManager();
|
TransactionManager annotationDrivenTransactionManager();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,17 +23,25 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
import io.vavr.control.Try;
|
import io.vavr.control.Try;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.beans.factory.BeanFactory;
|
import org.springframework.beans.factory.BeanFactory;
|
||||||
import org.springframework.beans.factory.BeanFactoryAware;
|
import org.springframework.beans.factory.BeanFactoryAware;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||||
import org.springframework.core.NamedThreadLocal;
|
import org.springframework.core.NamedThreadLocal;
|
||||||
|
import org.springframework.core.ReactiveAdapter;
|
||||||
|
import org.springframework.core.ReactiveAdapterRegistry;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
import org.springframework.transaction.NoTransactionException;
|
import org.springframework.transaction.NoTransactionException;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
import org.springframework.transaction.ReactiveTransaction;
|
||||||
|
import org.springframework.transaction.ReactiveTransactionManager;
|
||||||
|
import org.springframework.transaction.TransactionManager;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.transaction.TransactionStatus;
|
||||||
import org.springframework.transaction.TransactionSystemException;
|
import org.springframework.transaction.TransactionSystemException;
|
||||||
|
import org.springframework.transaction.reactive.TransactionContextManager;
|
||||||
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
|
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.ClassUtils;
|
import org.springframework.util.ClassUtils;
|
||||||
|
@ -86,6 +94,12 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
private static final boolean vavrPresent = ClassUtils.isPresent(
|
private static final boolean vavrPresent = ClassUtils.isPresent(
|
||||||
"io.vavr.control.Try", TransactionAspectSupport.class.getClassLoader());
|
"io.vavr.control.Try", TransactionAspectSupport.class.getClassLoader());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reactive Streams API present on the classpath?
|
||||||
|
*/
|
||||||
|
private static final boolean reactiveStreamsPresent =
|
||||||
|
ClassUtils.isPresent("org.reactivestreams.Publisher", TransactionAspectSupport.class.getClassLoader());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holder to support the {@code currentTransactionStatus()} method,
|
* Holder to support the {@code currentTransactionStatus()} method,
|
||||||
* and to support communication between different cooperating advices
|
* and to support communication between different cooperating advices
|
||||||
|
@ -136,11 +150,14 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
|
|
||||||
protected final Log logger = LogFactory.getLog(getClass());
|
protected final Log logger = LogFactory.getLog(getClass());
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private final ReactiveAdapterRegistry reactiveAdapterRegistry;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private String transactionManagerBeanName;
|
private String transactionManagerBeanName;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private PlatformTransactionManager transactionManager;
|
private TransactionManager transactionManager;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private TransactionAttributeSource transactionAttributeSource;
|
private TransactionAttributeSource transactionAttributeSource;
|
||||||
|
@ -148,12 +165,23 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
@Nullable
|
@Nullable
|
||||||
private BeanFactory beanFactory;
|
private BeanFactory beanFactory;
|
||||||
|
|
||||||
private final ConcurrentMap<Object, PlatformTransactionManager> transactionManagerCache =
|
private final ConcurrentMap<Object, Object> transactionManagerCache = new ConcurrentReferenceHashMap<>(4);
|
||||||
new ConcurrentReferenceHashMap<>(4);
|
|
||||||
|
|
||||||
|
protected TransactionAspectSupport() {
|
||||||
|
if (reactiveStreamsPresent) {
|
||||||
|
this.reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.reactiveAdapterRegistry = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify the name of the default transaction manager bean.
|
* Specify the name of the default transaction manager bean.
|
||||||
|
* This can either point to a traditional {@link PlatformTransactionManager} or a
|
||||||
|
* {@link ReactiveTransactionManager} for reactive transaction management.
|
||||||
*/
|
*/
|
||||||
public void setTransactionManagerBeanName(@Nullable String transactionManagerBeanName) {
|
public void setTransactionManagerBeanName(@Nullable String transactionManagerBeanName) {
|
||||||
this.transactionManagerBeanName = transactionManagerBeanName;
|
this.transactionManagerBeanName = transactionManagerBeanName;
|
||||||
|
@ -169,20 +197,24 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify the <em>default</em> transaction manager to use to drive transactions.
|
* Specify the <em>default</em> transaction manager to use to drive transactions.
|
||||||
|
* This can either be a traditional {@link PlatformTransactionManager} or a
|
||||||
|
* {@link ReactiveTransactionManager} for reactive transaction management.
|
||||||
* <p>The default transaction manager will be used if a <em>qualifier</em>
|
* <p>The default transaction manager will be used if a <em>qualifier</em>
|
||||||
* has not been declared for a given transaction or if an explicit name for the
|
* has not been declared for a given transaction or if an explicit name for the
|
||||||
* default transaction manager bean has not been specified.
|
* default transaction manager bean has not been specified.
|
||||||
* @see #setTransactionManagerBeanName
|
* @see #setTransactionManagerBeanName
|
||||||
*/
|
*/
|
||||||
public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
|
public void setTransactionManager(@Nullable TransactionManager transactionManager) {
|
||||||
this.transactionManager = transactionManager;
|
this.transactionManager = transactionManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the default transaction manager, or {@code null} if unknown.
|
* Return the default transaction manager, or {@code null} if unknown.
|
||||||
|
* This can either be a traditional {@link PlatformTransactionManager} or a
|
||||||
|
* {@link ReactiveTransactionManager} for reactive transaction management.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public PlatformTransactionManager getTransactionManager() {
|
public TransactionManager getTransactionManager() {
|
||||||
return this.transactionManager;
|
return this.transactionManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +317,13 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
|
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
|
||||||
final InvocationCallback invocation) throws Throwable {
|
final InvocationCallback invocation) throws Throwable {
|
||||||
|
|
||||||
|
if (this.reactiveAdapterRegistry != null) {
|
||||||
|
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
|
||||||
|
if (adapter != null) {
|
||||||
|
return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If the transaction attribute is null, the method is non-transactional.
|
// If the transaction attribute is null, the method is non-transactional.
|
||||||
TransactionAttributeSource tas = getTransactionAttributeSource();
|
TransactionAttributeSource tas = getTransactionAttributeSource();
|
||||||
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
|
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
|
||||||
|
@ -398,7 +437,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
|
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
|
||||||
// Do not attempt to lookup tx manager if no tx attributes are set
|
// Do not attempt to lookup tx manager if no tx attributes are set
|
||||||
if (txAttr == null || this.beanFactory == null) {
|
if (txAttr == null || this.beanFactory == null) {
|
||||||
return getTransactionManager();
|
return asPlatformTransactionManager(getTransactionManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
String qualifier = txAttr.getQualifier();
|
String qualifier = txAttr.getQualifier();
|
||||||
|
@ -409,9 +448,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
|
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
PlatformTransactionManager defaultTransactionManager = getTransactionManager();
|
PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager());
|
||||||
if (defaultTransactionManager == null) {
|
if (defaultTransactionManager == null) {
|
||||||
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
|
defaultTransactionManager = asPlatformTransactionManager(this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
|
||||||
if (defaultTransactionManager == null) {
|
if (defaultTransactionManager == null) {
|
||||||
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
|
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
|
||||||
this.transactionManagerCache.putIfAbsent(
|
this.transactionManagerCache.putIfAbsent(
|
||||||
|
@ -423,7 +462,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
}
|
}
|
||||||
|
|
||||||
private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
|
private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
|
||||||
PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier);
|
PlatformTransactionManager txManager = asPlatformTransactionManager(this.transactionManagerCache.get(qualifier));
|
||||||
if (txManager == null) {
|
if (txManager == null) {
|
||||||
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
|
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
|
||||||
beanFactory, PlatformTransactionManager.class, qualifier);
|
beanFactory, PlatformTransactionManager.class, qualifier);
|
||||||
|
@ -432,6 +471,18 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
return txManager;
|
return txManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private PlatformTransactionManager asPlatformTransactionManager(@Nullable Object transactionManager) {
|
||||||
|
if (transactionManager == null || transactionManager instanceof PlatformTransactionManager) {
|
||||||
|
return (PlatformTransactionManager) transactionManager;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Specified transaction manager is not a PlatformTransactionManager: " + transactionManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String methodIdentification(Method method, @Nullable Class<?> targetClass,
|
private String methodIdentification(Method method, @Nullable Class<?> targetClass,
|
||||||
@Nullable TransactionAttribute txAttr) {
|
@Nullable TransactionAttribute txAttr) {
|
||||||
|
|
||||||
|
@ -614,7 +665,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opaque object used to hold Transaction information. Subclasses
|
* Opaque object used to hold transaction information. Subclasses
|
||||||
* must pass it back to methods on this class, but not see its internals.
|
* must pass it back to methods on this class, but not see its internals.
|
||||||
*/
|
*/
|
||||||
protected static final class TransactionInfo {
|
protected static final class TransactionInfo {
|
||||||
|
@ -753,4 +804,286 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delegate for Reactor-based management of transactional methods with a
|
||||||
|
* reactive return type.
|
||||||
|
*/
|
||||||
|
private class ReactiveTransactionSupport {
|
||||||
|
|
||||||
|
private final ReactiveAdapter adapter;
|
||||||
|
|
||||||
|
public ReactiveTransactionSupport(ReactiveAdapter adapter) {
|
||||||
|
this.adapter = adapter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, InvocationCallback invocation) {
|
||||||
|
// If the transaction attribute is null, the method is non-transactional.
|
||||||
|
TransactionAttributeSource tas = getTransactionAttributeSource();
|
||||||
|
TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
|
||||||
|
ReactiveTransactionManager tm = determineTransactionManager(txAttr);
|
||||||
|
String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
|
||||||
|
|
||||||
|
// Optimize for Mono
|
||||||
|
if (Mono.class.isAssignableFrom(method.getReturnType())) {
|
||||||
|
return TransactionContextManager.currentContext().flatMap(context -> {
|
||||||
|
// Standard transaction demarcation with getTransaction and commit/rollback calls.
|
||||||
|
Mono<ReactiveTransactionInfo> txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
|
||||||
|
return txInfo.flatMap(it -> {
|
||||||
|
try {
|
||||||
|
// This is an around advice: Invoke the next interceptor in the chain.
|
||||||
|
// This will normally result in a target object being invoked.
|
||||||
|
Mono<Object> retVal = (Mono) invocation.proceedWithInvocation();
|
||||||
|
return retVal
|
||||||
|
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))).materialize()
|
||||||
|
.flatMap(signal -> {
|
||||||
|
if (signal.isOnComplete() || signal.isOnNext()) {
|
||||||
|
return commitTransactionAfterReturning(it).thenReturn(signal);
|
||||||
|
}
|
||||||
|
return Mono.just(signal);
|
||||||
|
}).dematerialize();
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
// target invocation exception
|
||||||
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).subscriberContext(TransactionContextManager.getOrCreateContext())
|
||||||
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
||||||
|
}
|
||||||
|
|
||||||
|
return TransactionContextManager.currentContext().flatMapMany(context -> {
|
||||||
|
// Standard transaction demarcation with getTransaction and commit/rollback calls.
|
||||||
|
Mono<ReactiveTransactionInfo> txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
|
||||||
|
return txInfo.flatMapMany(it -> {
|
||||||
|
try {
|
||||||
|
// This is an around advice: Invoke the next interceptor in the chain.
|
||||||
|
// This will normally result in a target object being invoked.
|
||||||
|
Flux<Object> retVal = Flux.from(this.adapter.toPublisher(invocation.proceedWithInvocation()));
|
||||||
|
return retVal
|
||||||
|
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)))
|
||||||
|
.materialize().flatMap(signal -> {
|
||||||
|
if (signal.isOnComplete()) {
|
||||||
|
return commitTransactionAfterReturning(it).materialize();
|
||||||
|
}
|
||||||
|
return Mono.just(signal);
|
||||||
|
}).dematerialize();
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
// target invocation exception
|
||||||
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).subscriberContext(TransactionContextManager.getOrCreateContext())
|
||||||
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private ReactiveTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
|
||||||
|
// Do not attempt to lookup tx manager if no tx attributes are set
|
||||||
|
if (txAttr == null || beanFactory == null) {
|
||||||
|
return asReactiveTransactionManager(getTransactionManager());
|
||||||
|
}
|
||||||
|
|
||||||
|
String qualifier = txAttr.getQualifier();
|
||||||
|
if (StringUtils.hasText(qualifier)) {
|
||||||
|
return determineQualifiedTransactionManager(beanFactory, qualifier);
|
||||||
|
}
|
||||||
|
else if (StringUtils.hasText(transactionManagerBeanName)) {
|
||||||
|
return determineQualifiedTransactionManager(beanFactory, transactionManagerBeanName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager());
|
||||||
|
if (defaultTransactionManager == null) {
|
||||||
|
defaultTransactionManager = asReactiveTransactionManager(transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
|
||||||
|
if (defaultTransactionManager == null) {
|
||||||
|
defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class);
|
||||||
|
transactionManagerCache.putIfAbsent(
|
||||||
|
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultTransactionManager;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReactiveTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
|
||||||
|
ReactiveTransactionManager txManager = asReactiveTransactionManager(transactionManagerCache.get(qualifier));
|
||||||
|
if (txManager == null) {
|
||||||
|
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
|
||||||
|
beanFactory, ReactiveTransactionManager.class, qualifier);
|
||||||
|
transactionManagerCache.putIfAbsent(qualifier, txManager);
|
||||||
|
}
|
||||||
|
return txManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private ReactiveTransactionManager asReactiveTransactionManager(@Nullable Object transactionManager) {
|
||||||
|
if (transactionManager == null || transactionManager instanceof ReactiveTransactionManager) {
|
||||||
|
return (ReactiveTransactionManager) transactionManager;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Specified transaction manager is not a ReactiveTransactionManager: " + transactionManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
private Mono<ReactiveTransactionInfo> createTransactionIfNecessary(@Nullable ReactiveTransactionManager tm,
|
||||||
|
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
|
||||||
|
|
||||||
|
// If no name specified, apply method identification as transaction name.
|
||||||
|
if (txAttr != null && txAttr.getName() == null) {
|
||||||
|
txAttr = new DelegatingTransactionAttribute(txAttr) {
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return joinpointIdentification;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
TransactionAttribute attrToUse = txAttr;
|
||||||
|
|
||||||
|
Mono<ReactiveTransaction> tx = Mono.empty();
|
||||||
|
if (txAttr != null) {
|
||||||
|
if (tm != null) {
|
||||||
|
tx = tm.getReactiveTransaction(txAttr);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
|
||||||
|
"] because no transaction manager has been configured");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it))
|
||||||
|
.switchIfEmpty(Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null))));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReactiveTransactionInfo prepareTransactionInfo(@Nullable ReactiveTransactionManager tm,
|
||||||
|
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
|
||||||
|
@Nullable ReactiveTransaction transaction) {
|
||||||
|
|
||||||
|
ReactiveTransactionInfo txInfo = new ReactiveTransactionInfo(tm, txAttr, joinpointIdentification);
|
||||||
|
if (txAttr != null) {
|
||||||
|
// We need a transaction for this method...
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
|
||||||
|
}
|
||||||
|
// The transaction manager will flag an error if an incompatible tx already exists.
|
||||||
|
txInfo.newReactiveTransaction(transaction);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// The TransactionInfo.hasTransaction() method will return false. We created it only
|
||||||
|
// to preserve the integrity of the ThreadLocal stack maintained in this class.
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
|
||||||
|
"]: This method isn't transactional.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return txInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<Void> commitTransactionAfterReturning(@Nullable ReactiveTransactionInfo txInfo) {
|
||||||
|
if (txInfo != null && txInfo.getReactiveTransaction() != null) {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
|
||||||
|
}
|
||||||
|
return txInfo.getTransactionManager().commit(txInfo.getReactiveTransaction());
|
||||||
|
}
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactionInfo txInfo, Throwable ex) {
|
||||||
|
if (txInfo != null && txInfo.getReactiveTransaction() != null) {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
|
||||||
|
"] after exception: " + ex);
|
||||||
|
}
|
||||||
|
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
|
||||||
|
return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> {
|
||||||
|
logger.error("Application exception overridden by rollback exception", ex);
|
||||||
|
if (ex2 instanceof TransactionSystemException) {
|
||||||
|
((TransactionSystemException) ex2).initApplicationException(ex);
|
||||||
|
}
|
||||||
|
return ex2;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// We don't roll back on this exception.
|
||||||
|
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
|
||||||
|
return txInfo.getTransactionManager().commit(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> {
|
||||||
|
logger.error("Application exception overridden by commit exception", ex);
|
||||||
|
if (ex2 instanceof TransactionSystemException) {
|
||||||
|
((TransactionSystemException) ex2).initApplicationException(ex);
|
||||||
|
}
|
||||||
|
return ex2;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opaque object used to hold transaction information for reactive methods.
|
||||||
|
*/
|
||||||
|
private static final class ReactiveTransactionInfo {
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private final ReactiveTransactionManager transactionManager;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private final TransactionAttribute transactionAttribute;
|
||||||
|
|
||||||
|
private final String joinpointIdentification;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private ReactiveTransaction reactiveTransaction;
|
||||||
|
|
||||||
|
public ReactiveTransactionInfo(@Nullable ReactiveTransactionManager transactionManager,
|
||||||
|
@Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) {
|
||||||
|
|
||||||
|
this.transactionManager = transactionManager;
|
||||||
|
this.transactionAttribute = transactionAttribute;
|
||||||
|
this.joinpointIdentification = joinpointIdentification;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReactiveTransactionManager getTransactionManager() {
|
||||||
|
Assert.state(this.transactionManager != null, "No ReactiveTransactionManager set");
|
||||||
|
return this.transactionManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public TransactionAttribute getTransactionAttribute() {
|
||||||
|
return this.transactionAttribute;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a String representation of this joinpoint (usually a Method call)
|
||||||
|
* for use in logging.
|
||||||
|
*/
|
||||||
|
public String getJoinpointIdentification() {
|
||||||
|
return this.joinpointIdentification;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void newReactiveTransaction(@Nullable ReactiveTransaction transaction) {
|
||||||
|
this.reactiveTransaction = transaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public ReactiveTransaction getReactiveTransaction() {
|
||||||
|
return this.reactiveTransaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return (this.transactionAttribute != null ? this.transactionAttribute.toString() : "No transaction");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,406 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2019 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.transaction.interceptor;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
|
import org.springframework.transaction.CannotCreateTransactionException;
|
||||||
|
import org.springframework.transaction.ReactiveTransaction;
|
||||||
|
import org.springframework.transaction.ReactiveTransactionManager;
|
||||||
|
import org.springframework.transaction.TransactionSystemException;
|
||||||
|
import org.springframework.transaction.UnexpectedRollbackException;
|
||||||
|
import org.springframework.transaction.reactive.TransactionContext;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
import static org.assertj.core.api.Fail.fail;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract support class to test {@link TransactionAspectSupport} with reactive methods.
|
||||||
|
*
|
||||||
|
* @author Mark Paluch
|
||||||
|
* @author Juergen Hoeller
|
||||||
|
*/
|
||||||
|
public abstract class AbstractReactiveTransactionAspectTests {
|
||||||
|
|
||||||
|
protected Method exceptionalMethod;
|
||||||
|
|
||||||
|
protected Method getNameMethod;
|
||||||
|
|
||||||
|
protected Method setNameMethod;
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
exceptionalMethod = TestBean.class.getMethod("exceptional", Throwable.class);
|
||||||
|
getNameMethod = TestBean.class.getMethod("getName");
|
||||||
|
setNameMethod = TestBean.class.getMethod("setName", String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noTransaction() throws Exception {
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean();
|
||||||
|
TransactionAttributeSource tas = new MapTransactionAttributeSource();
|
||||||
|
|
||||||
|
// All the methods in this class use the advised() template method
|
||||||
|
// to obtain a transaction object, configured with the when PlatformTransactionManager
|
||||||
|
// and transaction attribute source
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, tas);
|
||||||
|
|
||||||
|
checkReactiveTransaction(false);
|
||||||
|
itb.getName();
|
||||||
|
checkReactiveTransaction(false);
|
||||||
|
|
||||||
|
// expect no calls
|
||||||
|
verifyZeroInteractions(rtm);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that a transaction is created and committed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void transactionShouldSucceed() throws Exception {
|
||||||
|
TransactionAttribute txatt = new DefaultTransactionAttribute();
|
||||||
|
|
||||||
|
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
|
||||||
|
tas.register(getNameMethod, txatt);
|
||||||
|
|
||||||
|
ReactiveTransaction status = mock(ReactiveTransaction.class);
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
// expect a transaction
|
||||||
|
when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status));
|
||||||
|
when(rtm.commit(status)).thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean();
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, tas);
|
||||||
|
|
||||||
|
itb.getName()
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.verifyComplete();
|
||||||
|
|
||||||
|
verify(rtm).commit(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that two transactions are created and committed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void twoTransactionsShouldSucceed() throws Exception {
|
||||||
|
TransactionAttribute txatt = new DefaultTransactionAttribute();
|
||||||
|
|
||||||
|
MapTransactionAttributeSource tas1 = new MapTransactionAttributeSource();
|
||||||
|
tas1.register(getNameMethod, txatt);
|
||||||
|
MapTransactionAttributeSource tas2 = new MapTransactionAttributeSource();
|
||||||
|
tas2.register(setNameMethod, txatt);
|
||||||
|
|
||||||
|
ReactiveTransaction status = mock(ReactiveTransaction.class);
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
// expect a transaction
|
||||||
|
when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status));
|
||||||
|
when(rtm.commit(status)).thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean();
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, new TransactionAttributeSource[] {tas1, tas2});
|
||||||
|
|
||||||
|
itb.getName()
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.verifyComplete();
|
||||||
|
|
||||||
|
Mono.from(itb.setName("myName"))
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.verifyComplete();
|
||||||
|
|
||||||
|
verify(rtm, times(2)).commit(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that a transaction is created and committed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void transactionShouldSucceedWithNotNew() throws Exception {
|
||||||
|
TransactionAttribute txatt = new DefaultTransactionAttribute();
|
||||||
|
|
||||||
|
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
|
||||||
|
tas.register(getNameMethod, txatt);
|
||||||
|
|
||||||
|
ReactiveTransaction status = mock(ReactiveTransaction.class);
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
// expect a transaction
|
||||||
|
when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status));
|
||||||
|
when(rtm.commit(status)).thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean();
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, tas);
|
||||||
|
|
||||||
|
itb.getName()
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.verifyComplete();
|
||||||
|
|
||||||
|
verify(rtm).commit(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void rollbackOnCheckedException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new Exception(), true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noRollbackOnCheckedException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new Exception(), false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void rollbackOnUncheckedException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new RuntimeException(), true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noRollbackOnUncheckedException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new RuntimeException(), false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void rollbackOnCheckedExceptionWithRollbackException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new Exception(), true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noRollbackOnCheckedExceptionWithRollbackException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new Exception(), false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void rollbackOnUncheckedExceptionWithRollbackException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new RuntimeException(), true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noRollbackOnUncheckedExceptionWithRollbackException() throws Throwable {
|
||||||
|
doTestRollbackOnException(new RuntimeException(), false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that the when exception thrown by the target can produce the
|
||||||
|
* desired behavior with the appropriate transaction attribute.
|
||||||
|
* @param ex exception to be thrown by the target
|
||||||
|
* @param shouldRollback whether this should cause a transaction rollback
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
protected void doTestRollbackOnException(
|
||||||
|
final Exception ex, final boolean shouldRollback, boolean rollbackException) throws Exception {
|
||||||
|
|
||||||
|
TransactionAttribute txatt = new DefaultTransactionAttribute() {
|
||||||
|
@Override
|
||||||
|
public boolean rollbackOn(Throwable t) {
|
||||||
|
assertThat(t).isSameAs(ex);
|
||||||
|
return shouldRollback;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Method m = exceptionalMethod;
|
||||||
|
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
|
||||||
|
tas.register(m, txatt);
|
||||||
|
|
||||||
|
ReactiveTransaction status = mock(ReactiveTransaction.class);
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
// Gets additional call(s) from TransactionControl
|
||||||
|
|
||||||
|
when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status));
|
||||||
|
|
||||||
|
TransactionSystemException tex = new TransactionSystemException("system exception");
|
||||||
|
if (rollbackException) {
|
||||||
|
if (shouldRollback) {
|
||||||
|
when(rtm.rollback(status)).thenReturn(Mono.error(tex));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
when(rtm.commit(status)).thenReturn(Mono.error(tex));
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
when(rtm.commit(status)).thenReturn(Mono.empty());
|
||||||
|
when(rtm.rollback(status)).thenReturn(Mono.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean();
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, tas);
|
||||||
|
|
||||||
|
itb.exceptional(ex)
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.expectErrorSatisfies(actual -> {
|
||||||
|
|
||||||
|
if (rollbackException) {
|
||||||
|
assertThat(actual).isEqualTo(tex);
|
||||||
|
} else {
|
||||||
|
assertThat(actual).isEqualTo(ex);
|
||||||
|
}
|
||||||
|
}).verify();
|
||||||
|
|
||||||
|
if (!rollbackException) {
|
||||||
|
if (shouldRollback) {
|
||||||
|
verify(rtm).rollback(status);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
verify(rtm).commit(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate a transaction infrastructure failure.
|
||||||
|
* Shouldn't invoke target method.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void cannotCreateTransaction() throws Exception {
|
||||||
|
TransactionAttribute txatt = new DefaultTransactionAttribute();
|
||||||
|
|
||||||
|
Method m = getNameMethod;
|
||||||
|
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
|
||||||
|
tas.register(m, txatt);
|
||||||
|
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
// Expect a transaction
|
||||||
|
CannotCreateTransactionException ex = new CannotCreateTransactionException("foobar", null);
|
||||||
|
when(rtm.getReactiveTransaction(txatt)).thenThrow(ex);
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean() {
|
||||||
|
@Override
|
||||||
|
public Mono<String> getName() {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Shouldn't have invoked target method when couldn't create transaction for transactional method");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, tas);
|
||||||
|
|
||||||
|
itb.getName()
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.expectError(CannotCreateTransactionException.class)
|
||||||
|
.verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate failure of the underlying transaction infrastructure to commit.
|
||||||
|
* Check that the target method was invoked, but that the transaction
|
||||||
|
* infrastructure exception was thrown to the client
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void cannotCommitTransaction() throws Exception {
|
||||||
|
TransactionAttribute txatt = new DefaultTransactionAttribute();
|
||||||
|
|
||||||
|
Method m = setNameMethod;
|
||||||
|
MapTransactionAttributeSource tas = new MapTransactionAttributeSource();
|
||||||
|
tas.register(m, txatt);
|
||||||
|
// Method m2 = getNameMethod;
|
||||||
|
// No attributes for m2
|
||||||
|
|
||||||
|
ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class);
|
||||||
|
|
||||||
|
ReactiveTransaction status = mock(ReactiveTransaction.class);
|
||||||
|
when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status));
|
||||||
|
UnexpectedRollbackException ex = new UnexpectedRollbackException("foobar", null);
|
||||||
|
when(rtm.commit(status)).thenReturn(Mono.error(ex));
|
||||||
|
|
||||||
|
DefaultTestBean tb = new DefaultTestBean();
|
||||||
|
TestBean itb = (TestBean) advised(tb, rtm, tas);
|
||||||
|
|
||||||
|
String name = "new name";
|
||||||
|
|
||||||
|
Mono.from(itb.setName(name))
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.expectError(UnexpectedRollbackException.class)
|
||||||
|
.verify();
|
||||||
|
|
||||||
|
// Should have invoked target and changed name
|
||||||
|
|
||||||
|
itb.getName()
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.expectNext(name)
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkReactiveTransaction(boolean expected) {
|
||||||
|
Mono.subscriberContext().handle((context, sink) -> {
|
||||||
|
if (context.hasKey(TransactionContext.class) != expected){
|
||||||
|
fail("Should have thrown NoTransactionException");
|
||||||
|
}
|
||||||
|
sink.complete();
|
||||||
|
}).block();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected Object advised(
|
||||||
|
Object target, ReactiveTransactionManager rtm, TransactionAttributeSource[] tas) throws Exception {
|
||||||
|
|
||||||
|
return advised(target, rtm, new CompositeTransactionAttributeSource(tas));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subclasses must implement this to create an advised object based on the
|
||||||
|
* when target. In the case of AspectJ, the advised object will already
|
||||||
|
* have been created, as there's no distinction between target and proxy.
|
||||||
|
* In the case of Spring's own AOP framework, a proxy must be created
|
||||||
|
* using a suitably configured transaction interceptor
|
||||||
|
* @param target target if there's a distinct target. If not (AspectJ),
|
||||||
|
* return target.
|
||||||
|
* @return transactional advised object
|
||||||
|
*/
|
||||||
|
protected abstract Object advised(
|
||||||
|
Object target, ReactiveTransactionManager rtm, TransactionAttributeSource tas) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
public interface TestBean {
|
||||||
|
|
||||||
|
Mono<Void> exceptional(Throwable t);
|
||||||
|
|
||||||
|
Mono<String> getName();
|
||||||
|
|
||||||
|
Publisher<Void> setName(String name);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public class DefaultTestBean implements TestBean {
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<String> getName() {
|
||||||
|
return Mono.justOrEmpty(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> setName(String name) {
|
||||||
|
return Mono.fromRunnable(() -> this.name = name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> exceptional(Throwable t) {
|
||||||
|
if (t != null) {
|
||||||
|
return Mono.error(t);
|
||||||
|
}
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,112 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2019 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.transaction.interceptor;
|
||||||
|
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import org.springframework.aop.framework.ProxyFactory;
|
||||||
|
import org.springframework.beans.factory.BeanFactory;
|
||||||
|
import org.springframework.transaction.ReactiveTransactionManager;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for {@link TransactionInterceptor} with reactive methods.
|
||||||
|
*
|
||||||
|
* @author Mark Paluch
|
||||||
|
*/
|
||||||
|
public class ReactiveTransactionInterceptorTests extends AbstractReactiveTransactionAspectTests {
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Object advised(Object target, ReactiveTransactionManager ptm, TransactionAttributeSource[] tas) {
|
||||||
|
TransactionInterceptor ti = new TransactionInterceptor();
|
||||||
|
ti.setTransactionManager(ptm);
|
||||||
|
ti.setTransactionAttributeSources(tas);
|
||||||
|
|
||||||
|
ProxyFactory pf = new ProxyFactory(target);
|
||||||
|
pf.addAdvice(0, ti);
|
||||||
|
return pf.getProxy();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Template method to create an advised object given the
|
||||||
|
* target object and transaction setup.
|
||||||
|
* Creates a TransactionInterceptor and applies it.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected Object advised(Object target, ReactiveTransactionManager ptm, TransactionAttributeSource tas) {
|
||||||
|
TransactionInterceptor ti = new TransactionInterceptor();
|
||||||
|
ti.setTransactionManager(ptm);
|
||||||
|
|
||||||
|
assertThat(ti.getTransactionManager()).isEqualTo(ptm);
|
||||||
|
ti.setTransactionAttributeSource(tas);
|
||||||
|
assertThat(ti.getTransactionAttributeSource()).isEqualTo(tas);
|
||||||
|
|
||||||
|
ProxyFactory pf = new ProxyFactory(target);
|
||||||
|
pf.addAdvice(0, ti);
|
||||||
|
return pf.getProxy();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TransactionInterceptor createTransactionInterceptor(BeanFactory beanFactory,
|
||||||
|
String transactionManagerName, ReactiveTransactionManager transactionManager) {
|
||||||
|
|
||||||
|
TransactionInterceptor ti = new TransactionInterceptor();
|
||||||
|
if (beanFactory != null) {
|
||||||
|
ti.setBeanFactory(beanFactory);
|
||||||
|
}
|
||||||
|
if (transactionManagerName != null) {
|
||||||
|
ti.setTransactionManagerBeanName(transactionManagerName);
|
||||||
|
|
||||||
|
}
|
||||||
|
if (transactionManager != null) {
|
||||||
|
ti.setTransactionManager(transactionManager);
|
||||||
|
}
|
||||||
|
ti.setTransactionAttributeSource(new NameMatchTransactionAttributeSource());
|
||||||
|
ti.afterPropertiesSet();
|
||||||
|
return ti;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TransactionInterceptor transactionInterceptorWithTransactionManager(
|
||||||
|
ReactiveTransactionManager transactionManager, BeanFactory beanFactory) {
|
||||||
|
|
||||||
|
return createTransactionInterceptor(beanFactory, null, transactionManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TransactionInterceptor transactionInterceptorWithTransactionManagerName(
|
||||||
|
String transactionManagerName, BeanFactory beanFactory) {
|
||||||
|
|
||||||
|
return createTransactionInterceptor(beanFactory, transactionManagerName, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TransactionInterceptor simpleTransactionInterceptor(BeanFactory beanFactory) {
|
||||||
|
return createTransactionInterceptor(beanFactory, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReactiveTransactionManager associateTransactionManager(BeanFactory beanFactory, String name) {
|
||||||
|
ReactiveTransactionManager transactionManager = mock(ReactiveTransactionManager.class);
|
||||||
|
when(beanFactory.containsBean(name)).thenReturn(true);
|
||||||
|
when(beanFactory.getBean(name, ReactiveTransactionManager.class)).thenReturn(transactionManager);
|
||||||
|
return transactionManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue