Consider target transaction manager for reactive transaction decision
Closes gh-23832
This commit is contained in:
parent
cef4478b7b
commit
43a86565ca
|
@ -173,7 +173,11 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
@Nullable
|
||||
private BeanFactory beanFactory;
|
||||
|
||||
private final ConcurrentMap<Object, Object> transactionManagerCache = new ConcurrentReferenceHashMap<>(4);
|
||||
private final ConcurrentMap<Object, TransactionManager> transactionManagerCache =
|
||||
new ConcurrentReferenceHashMap<>(4);
|
||||
|
||||
private final ConcurrentMap<Method, ReactiveTransactionSupport> transactionSupportCache =
|
||||
new ConcurrentReferenceHashMap<>(1024);
|
||||
|
||||
|
||||
protected TransactionAspectSupport() {
|
||||
|
@ -301,7 +305,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
if (getTransactionManager() == null && this.beanFactory == null) {
|
||||
throw new IllegalStateException(
|
||||
"Set the 'transactionManager' property or make sure to run within a BeanFactory " +
|
||||
"containing a PlatformTransactionManager bean!");
|
||||
"containing a TransactionManager bean!");
|
||||
}
|
||||
if (getTransactionAttributeSource() == null) {
|
||||
throw new IllegalStateException(
|
||||
|
@ -325,26 +329,35 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
|
||||
final InvocationCallback invocation) throws Throwable {
|
||||
|
||||
if (this.reactiveAdapterRegistry != null) {
|
||||
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
|
||||
throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: "
|
||||
+ method + ". Use TransactionalOperator.transactional extensions instead.");
|
||||
}
|
||||
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
|
||||
if (adapter != null) {
|
||||
return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation);
|
||||
}
|
||||
}
|
||||
|
||||
// If the transaction attribute is null, the method is non-transactional.
|
||||
TransactionAttributeSource tas = getTransactionAttributeSource();
|
||||
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
|
||||
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
|
||||
final TransactionManager tm = determineTransactionManager(txAttr);
|
||||
|
||||
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
|
||||
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
|
||||
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
|
||||
throw new TransactionUsageException(
|
||||
"Unsupported annotated transaction on suspending function detected: " + method +
|
||||
". Use TransactionalOperator.transactional extensions instead.");
|
||||
}
|
||||
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
|
||||
if (adapter == null) {
|
||||
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
|
||||
method.getReturnType());
|
||||
}
|
||||
return new ReactiveTransactionSupport(adapter);
|
||||
});
|
||||
return txSupport.invokeWithinTransaction(
|
||||
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
|
||||
}
|
||||
|
||||
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
|
||||
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
|
||||
|
||||
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
|
||||
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
|
||||
// Standard transaction demarcation with getTransaction and commit/rollback calls.
|
||||
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
|
||||
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
|
||||
|
||||
Object retVal;
|
||||
try {
|
||||
|
@ -378,8 +391,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
|
||||
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
|
||||
try {
|
||||
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
|
||||
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
|
||||
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
|
||||
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
|
||||
try {
|
||||
Object retVal = invocation.proceedWithInvocation();
|
||||
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
|
||||
|
@ -446,10 +459,10 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
* Determine the specific transaction manager to use for the given transaction.
|
||||
*/
|
||||
@Nullable
|
||||
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
|
||||
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
|
||||
// Do not attempt to lookup tx manager if no tx attributes are set
|
||||
if (txAttr == null || this.beanFactory == null) {
|
||||
return asPlatformTransactionManager(getTransactionManager());
|
||||
return getTransactionManager();
|
||||
}
|
||||
|
||||
String qualifier = txAttr.getQualifier();
|
||||
|
@ -460,12 +473,11 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
|
||||
}
|
||||
else {
|
||||
PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager());
|
||||
TransactionManager defaultTransactionManager = getTransactionManager();
|
||||
if (defaultTransactionManager == null) {
|
||||
defaultTransactionManager = asPlatformTransactionManager(
|
||||
this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
|
||||
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
|
||||
if (defaultTransactionManager == null) {
|
||||
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
|
||||
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
|
||||
this.transactionManagerCache.putIfAbsent(
|
||||
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
|
||||
}
|
||||
|
@ -474,11 +486,11 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
}
|
||||
}
|
||||
|
||||
private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
|
||||
PlatformTransactionManager txManager = asPlatformTransactionManager(this.transactionManagerCache.get(qualifier));
|
||||
private TransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
|
||||
TransactionManager txManager = this.transactionManagerCache.get(qualifier);
|
||||
if (txManager == null) {
|
||||
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
|
||||
beanFactory, PlatformTransactionManager.class, qualifier);
|
||||
beanFactory, TransactionManager.class, qualifier);
|
||||
this.transactionManagerCache.putIfAbsent(qualifier, txManager);
|
||||
}
|
||||
return txManager;
|
||||
|
@ -841,33 +853,30 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, InvocationCallback invocation) {
|
||||
// If the transaction attribute is null, the method is non-transactional.
|
||||
TransactionAttributeSource tas = getTransactionAttributeSource();
|
||||
TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
|
||||
ReactiveTransactionManager tm = determineTransactionManager(txAttr);
|
||||
public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
|
||||
InvocationCallback invocation, @Nullable TransactionAttribute txAttr, ReactiveTransactionManager rtm) {
|
||||
|
||||
String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
|
||||
|
||||
// Optimize for Mono
|
||||
if (Mono.class.isAssignableFrom(method.getReturnType())) {
|
||||
return TransactionContextManager.currentContext().flatMap(context ->
|
||||
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> {
|
||||
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> {
|
||||
try {
|
||||
// Need re-wrapping until we get hold of the exception through usingWhen.
|
||||
return Mono
|
||||
.<Object, ReactiveTransactionInfo>usingWhen(
|
||||
Mono.just(it),
|
||||
txInfo -> {
|
||||
try {
|
||||
return (Mono<?>) invocation.proceedWithInvocation();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
},
|
||||
this::commitTransactionAfterReturning,
|
||||
(txInfo, err) -> Mono.empty(),
|
||||
this::commitTransactionAfterReturning)
|
||||
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
|
||||
Mono.just(it),
|
||||
txInfo -> {
|
||||
try {
|
||||
return (Mono<?>) invocation.proceedWithInvocation();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
},
|
||||
this::commitTransactionAfterReturning,
|
||||
(txInfo, err) -> Mono.empty(),
|
||||
this::commitTransactionAfterReturning)
|
||||
.onErrorResume(ex ->
|
||||
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
|
||||
}
|
||||
|
@ -881,7 +890,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
|
||||
// Any other reactive type, typically a Flux
|
||||
return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
|
||||
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> {
|
||||
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> {
|
||||
try {
|
||||
// Need re-wrapping until we get hold of the exception through usingWhen.
|
||||
return Flux
|
||||
|
@ -909,58 +918,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()));
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private ReactiveTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
|
||||
// Do not attempt to lookup tx manager if no tx attributes are set
|
||||
if (txAttr == null || beanFactory == null) {
|
||||
return asReactiveTransactionManager(getTransactionManager());
|
||||
}
|
||||
|
||||
String qualifier = txAttr.getQualifier();
|
||||
if (StringUtils.hasText(qualifier)) {
|
||||
return determineQualifiedTransactionManager(beanFactory, qualifier);
|
||||
}
|
||||
else if (StringUtils.hasText(transactionManagerBeanName)) {
|
||||
return determineQualifiedTransactionManager(beanFactory, transactionManagerBeanName);
|
||||
}
|
||||
else {
|
||||
ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager());
|
||||
if (defaultTransactionManager == null) {
|
||||
defaultTransactionManager = asReactiveTransactionManager(
|
||||
transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
|
||||
if (defaultTransactionManager == null) {
|
||||
defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class);
|
||||
transactionManagerCache.putIfAbsent(
|
||||
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
|
||||
}
|
||||
}
|
||||
return defaultTransactionManager;
|
||||
}
|
||||
}
|
||||
|
||||
private ReactiveTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
|
||||
ReactiveTransactionManager txManager = asReactiveTransactionManager(transactionManagerCache.get(qualifier));
|
||||
if (txManager == null) {
|
||||
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
|
||||
beanFactory, ReactiveTransactionManager.class, qualifier);
|
||||
transactionManagerCache.putIfAbsent(qualifier, txManager);
|
||||
}
|
||||
return txManager;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private ReactiveTransactionManager asReactiveTransactionManager(@Nullable Object transactionManager) {
|
||||
if (transactionManager == null || transactionManager instanceof ReactiveTransactionManager) {
|
||||
return (ReactiveTransactionManager) transactionManager;
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException(
|
||||
"Specified transaction manager is not a ReactiveTransactionManager: " + transactionManager);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private Mono<ReactiveTransactionInfo> createTransactionIfNecessary(@Nullable ReactiveTransactionManager tm,
|
||||
private Mono<ReactiveTransactionInfo> createTransactionIfNecessary(ReactiveTransactionManager tm,
|
||||
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
|
||||
|
||||
// If no name specified, apply method identification as transaction name.
|
||||
|
@ -972,21 +931,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
}
|
||||
};
|
||||
}
|
||||
TransactionAttribute attrToUse = txAttr;
|
||||
|
||||
Mono<ReactiveTransaction> tx = Mono.empty();
|
||||
if (txAttr != null) {
|
||||
if (tm != null) {
|
||||
tx = tm.getReactiveTransaction(txAttr);
|
||||
}
|
||||
else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
|
||||
"] because no transaction manager has been configured");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final TransactionAttribute attrToUse = txAttr;
|
||||
Mono<ReactiveTransaction> tx = (attrToUse != null ? tm.getReactiveTransaction(attrToUse) : Mono.empty());
|
||||
return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)).switchIfEmpty(
|
||||
Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null))));
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.springframework.lang.Nullable;
|
|||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.TransactionException;
|
||||
import org.springframework.transaction.TransactionManager;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.util.SerializationTestUtils;
|
||||
|
||||
|
@ -42,12 +43,13 @@ import static org.mockito.Mockito.verify;
|
|||
* Mock object based tests for TransactionInterceptor.
|
||||
*
|
||||
* @author Rod Johnson
|
||||
* @author Juergen Hoeller
|
||||
* @since 16.03.2003
|
||||
*/
|
||||
public class TransactionInterceptorTests extends AbstractTransactionAspectTests {
|
||||
|
||||
@Override
|
||||
protected Object advised(Object target, PlatformTransactionManager ptm, TransactionAttributeSource[] tas) throws Exception {
|
||||
protected Object advised(Object target, PlatformTransactionManager ptm, TransactionAttributeSource[] tas) {
|
||||
TransactionInterceptor ti = new TransactionInterceptor();
|
||||
ti.setTransactionManager(ptm);
|
||||
ti.setTransactionAttributeSources(tas);
|
||||
|
@ -214,14 +216,14 @@ public class TransactionInterceptorTests extends AbstractTransactionAspectTests
|
|||
|
||||
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
|
||||
attribute.setQualifier("fooTransactionManager");
|
||||
PlatformTransactionManager actual = ti.determineTransactionManager(attribute);
|
||||
TransactionManager actual = ti.determineTransactionManager(attribute);
|
||||
assertThat(actual).isSameAs(txManager);
|
||||
|
||||
// Call again, should be cached
|
||||
PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute);
|
||||
TransactionManager actual2 = ti.determineTransactionManager(attribute);
|
||||
assertThat(actual2).isSameAs(txManager);
|
||||
verify(beanFactory, times(1)).containsBean("fooTransactionManager");
|
||||
verify(beanFactory, times(1)).getBean("fooTransactionManager", PlatformTransactionManager.class);
|
||||
verify(beanFactory, times(1)).getBean("fooTransactionManager", TransactionManager.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -233,13 +235,13 @@ public class TransactionInterceptorTests extends AbstractTransactionAspectTests
|
|||
PlatformTransactionManager txManager = associateTransactionManager(beanFactory, "fooTransactionManager");
|
||||
|
||||
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
|
||||
PlatformTransactionManager actual = ti.determineTransactionManager(attribute);
|
||||
TransactionManager actual = ti.determineTransactionManager(attribute);
|
||||
assertThat(actual).isSameAs(txManager);
|
||||
|
||||
// Call again, should be cached
|
||||
PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute);
|
||||
TransactionManager actual2 = ti.determineTransactionManager(attribute);
|
||||
assertThat(actual2).isSameAs(txManager);
|
||||
verify(beanFactory, times(1)).getBean("fooTransactionManager", PlatformTransactionManager.class);
|
||||
verify(beanFactory, times(1)).getBean("fooTransactionManager", TransactionManager.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -248,16 +250,16 @@ public class TransactionInterceptorTests extends AbstractTransactionAspectTests
|
|||
TransactionInterceptor ti = simpleTransactionInterceptor(beanFactory);
|
||||
|
||||
PlatformTransactionManager txManager = mock(PlatformTransactionManager.class);
|
||||
given(beanFactory.getBean(PlatformTransactionManager.class)).willReturn(txManager);
|
||||
given(beanFactory.getBean(TransactionManager.class)).willReturn(txManager);
|
||||
|
||||
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
|
||||
PlatformTransactionManager actual = ti.determineTransactionManager(attribute);
|
||||
TransactionManager actual = ti.determineTransactionManager(attribute);
|
||||
assertThat(actual).isSameAs(txManager);
|
||||
|
||||
// Call again, should be cached
|
||||
PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute);
|
||||
TransactionManager actual2 = ti.determineTransactionManager(attribute);
|
||||
assertThat(actual2).isSameAs(txManager);
|
||||
verify(beanFactory, times(1)).getBean(PlatformTransactionManager.class);
|
||||
verify(beanFactory, times(1)).getBean(TransactionManager.class);
|
||||
}
|
||||
|
||||
|
||||
|
@ -299,7 +301,7 @@ public class TransactionInterceptorTests extends AbstractTransactionAspectTests
|
|||
private PlatformTransactionManager associateTransactionManager(BeanFactory beanFactory, String name) {
|
||||
PlatformTransactionManager transactionManager = mock(PlatformTransactionManager.class);
|
||||
given(beanFactory.containsBean(name)).willReturn(true);
|
||||
given(beanFactory.getBean(name, PlatformTransactionManager.class)).willReturn(transactionManager);
|
||||
given(beanFactory.getBean(name, TransactionManager.class)).willReturn(transactionManager);
|
||||
return transactionManager;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue