From 51b1306d7051164716e69c78df01ba330ce0e211 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Mon, 16 Nov 2020 17:41:09 +0100 Subject: [PATCH] Move coroutines invocation decision to invokeWithinTransaction See gh-26092 --- .../interceptor/TransactionAspectSupport.java | 44 ++++++++++++++++--- .../interceptor/TransactionInterceptor.java | 31 +++++++------ 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 1ecef7f733..18088595ea 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -34,6 +34,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; +import org.springframework.core.CoroutinesUtils; import org.springframework.core.KotlinDetector; import org.springframework.core.MethodParameter; import org.springframework.core.NamedThreadLocal; @@ -342,9 +343,16 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method); - boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()); + boolean hasSuspendingFlowReturnType = isSuspendingFunction && + COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()); + if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) { + throw new IllegalStateException("Coroutines invocation not supported: " + method); + } + CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null); + ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { - Class reactiveType = (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType()); + Class reactiveType = + (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType()); ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + @@ -352,9 +360,18 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init } return new ReactiveTransactionSupport(adapter); }); - Object result = txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); - return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow((Publisher) result) : - KotlinDelegate.awaitSingleOrNull((Publisher) result, ((CoroutinesInvocationCallback) invocation).getContinuation())) : result); + + InvocationCallback callback = invocation; + if (corInv != null) { + callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments()); + } + Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm); + if (corInv != null) { + Publisher pr = (Publisher) result; + return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) : + KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation())); + } + return result; } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); @@ -789,9 +806,20 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init Object proceedWithInvocation() throws Throwable; } + + /** + * Coroutines-supporting extension of the callback interface. + */ protected interface CoroutinesInvocationCallback extends InvocationCallback { - Object getContinuation(); + Object getTarget(); + + Object[] getArguments(); + + default Object getContinuation() { + Object[] args = getArguments(); + return args[args.length - 1]; + } } @@ -876,7 +904,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // For Mono and suspending functions not returning kotlinx.coroutines.flow.Flow - if (Mono.class.isAssignableFrom(method.getReturnType()) || (KotlinDetector.isSuspendingFunction(method) && !COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) { + if (Mono.class.isAssignableFrom(method.getReturnType()) || (KotlinDetector.isSuspendingFunction(method) && + !COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) { + return TransactionContextManager.currentContext().flatMap(context -> createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> { try { diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java index a6373d2a05..788c1f2519 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java @@ -27,8 +27,6 @@ import org.aopalliance.intercept.MethodInvocation; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.BeanFactory; -import org.springframework.core.CoroutinesUtils; -import org.springframework.core.KotlinDetector; import org.springframework.lang.Nullable; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionManager; @@ -118,20 +116,21 @@ public class TransactionInterceptor extends TransactionAspectSupport implements Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... - if (KotlinDetector.isSuspendingFunction(invocation.getMethod())) { - InvocationCallback callback = new CoroutinesInvocationCallback() { - @Override - public Object proceedWithInvocation() { - return CoroutinesUtils.invokeSuspendingFunction(invocation.getMethod(), invocation.getThis(), invocation.getArguments()); - } - @Override - public Object getContinuation() { - return invocation.getArguments()[invocation.getArguments().length - 1]; - } - }; - return invokeWithinTransaction(invocation.getMethod(), targetClass, callback); - } - return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); + return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { + @Override + @Nullable + public Object proceedWithInvocation() throws Throwable { + return invocation.proceed(); + } + @Override + public Object getTarget() { + return invocation.getThis(); + } + @Override + public Object[] getArguments() { + return invocation.getArguments(); + } + }); }