Move coroutines invocation decision to invokeWithinTransaction
See gh-26092
This commit is contained in:
parent
7206a23d33
commit
51b1306d70
|
@ -34,6 +34,7 @@ import org.springframework.beans.factory.BeanFactory;
|
|||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||
import org.springframework.core.CoroutinesUtils;
|
||||
import org.springframework.core.KotlinDetector;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.NamedThreadLocal;
|
||||
|
@ -342,9 +343,16 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
|
||||
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
|
||||
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
|
||||
boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
|
||||
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
|
||||
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
|
||||
if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
|
||||
throw new IllegalStateException("Coroutines invocation not supported: " + method);
|
||||
}
|
||||
CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);
|
||||
|
||||
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
|
||||
Class<?> reactiveType = (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
|
||||
Class<?> reactiveType =
|
||||
(isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
|
||||
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
|
||||
if (adapter == null) {
|
||||
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
|
||||
|
@ -352,9 +360,18 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
}
|
||||
return new ReactiveTransactionSupport(adapter);
|
||||
});
|
||||
Object result = txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
|
||||
return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow((Publisher<?>) result) :
|
||||
KotlinDelegate.awaitSingleOrNull((Publisher<?>) result, ((CoroutinesInvocationCallback) invocation).getContinuation())) : result);
|
||||
|
||||
InvocationCallback callback = invocation;
|
||||
if (corInv != null) {
|
||||
callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
|
||||
}
|
||||
Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm);
|
||||
if (corInv != null) {
|
||||
Publisher<?> pr = (Publisher<?>) result;
|
||||
return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
|
||||
KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
|
||||
|
@ -789,9 +806,20 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
Object proceedWithInvocation() throws Throwable;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Coroutines-supporting extension of the callback interface.
|
||||
*/
|
||||
protected interface CoroutinesInvocationCallback extends InvocationCallback {
|
||||
|
||||
Object getContinuation();
|
||||
Object getTarget();
|
||||
|
||||
Object[] getArguments();
|
||||
|
||||
default Object getContinuation() {
|
||||
Object[] args = getArguments();
|
||||
return args[args.length - 1];
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -876,7 +904,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
|
||||
|
||||
// For Mono and suspending functions not returning kotlinx.coroutines.flow.Flow
|
||||
if (Mono.class.isAssignableFrom(method.getReturnType()) || (KotlinDetector.isSuspendingFunction(method) && !COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) {
|
||||
if (Mono.class.isAssignableFrom(method.getReturnType()) || (KotlinDetector.isSuspendingFunction(method) &&
|
||||
!COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) {
|
||||
|
||||
return TransactionContextManager.currentContext().flatMap(context ->
|
||||
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> {
|
||||
try {
|
||||
|
|
|
@ -27,8 +27,6 @@ import org.aopalliance.intercept.MethodInvocation;
|
|||
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.core.CoroutinesUtils;
|
||||
import org.springframework.core.KotlinDetector;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionManager;
|
||||
|
@ -118,20 +116,21 @@ public class TransactionInterceptor extends TransactionAspectSupport implements
|
|||
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
|
||||
|
||||
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
|
||||
if (KotlinDetector.isSuspendingFunction(invocation.getMethod())) {
|
||||
InvocationCallback callback = new CoroutinesInvocationCallback() {
|
||||
@Override
|
||||
public Object proceedWithInvocation() {
|
||||
return CoroutinesUtils.invokeSuspendingFunction(invocation.getMethod(), invocation.getThis(), invocation.getArguments());
|
||||
}
|
||||
@Override
|
||||
public Object getContinuation() {
|
||||
return invocation.getArguments()[invocation.getArguments().length - 1];
|
||||
}
|
||||
};
|
||||
return invokeWithinTransaction(invocation.getMethod(), targetClass, callback);
|
||||
}
|
||||
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
|
||||
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
|
||||
@Override
|
||||
@Nullable
|
||||
public Object proceedWithInvocation() throws Throwable {
|
||||
return invocation.proceed();
|
||||
}
|
||||
@Override
|
||||
public Object getTarget() {
|
||||
return invocation.getThis();
|
||||
}
|
||||
@Override
|
||||
public Object[] getArguments() {
|
||||
return invocation.getArguments();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue