Add Coroutines support to Spring AOP
This commit adds support for Kotlin Coroutines to Spring AOP by leveraging CoroutinesUtils#invokeSuspendingFunction in AopUtils#invokeJoinpointUsingReflection to convert it to the equivalent Publisher return value, like in other parts of Spring Framework. That allows method interceptors with Reactive support to process related return values. CglibAopProxy#processReturnType and JdkDynamicAopProxy#invoke take care of the conversion from the Publisher return value to Kotlin Coroutines. Reactive transactional and HTTP service interface support have been refined to leverage those new generic capabilities. Closes gh-22462
This commit is contained in:
parent
9b3f4567ee
commit
c8169e5cad
|
@ -1,5 +1,6 @@
|
|||
plugins {
|
||||
id 'org.springframework.build.runtimehints-agent'
|
||||
id 'kotlin'
|
||||
}
|
||||
|
||||
description = "Spring Integration Tests"
|
||||
|
@ -26,6 +27,7 @@ dependencies {
|
|||
testImplementation("org.aspectj:aspectjweaver")
|
||||
testImplementation("org.hsqldb:hsqldb")
|
||||
testImplementation("org.hibernate:hibernate-core-jakarta")
|
||||
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
|
||||
}
|
||||
|
||||
normalization {
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
package org.springframework.aop.framework.autoproxy
|
||||
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.aopalliance.intercept.MethodInterceptor
|
||||
import org.aopalliance.intercept.MethodInvocation
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
|
||||
import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.context.annotation.EnableAspectJAutoProxy
|
||||
import org.springframework.test.annotation.DirtiesContext
|
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
|
||||
import reactor.core.publisher.Mono
|
||||
import java.lang.reflect.Method
|
||||
|
||||
|
||||
/**
|
||||
* Integration tests for interceptors with Kotlin (with and without Coroutines) configured
|
||||
* via AspectJ auto-proxy support.
|
||||
*/
|
||||
@SpringJUnitConfig(InterceptorConfig::class)
|
||||
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
|
||||
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||
@Autowired val echo: Echo,
|
||||
@Autowired val firstAdvisor: TestPointcutAdvisor,
|
||||
@Autowired val secondAdvisor: TestPointcutAdvisor) {
|
||||
|
||||
@Test
|
||||
fun `Multiple interceptors with regular function`() {
|
||||
assertThat(firstAdvisor.interceptor.invocations).isEmpty()
|
||||
assertThat(secondAdvisor.interceptor.invocations).isEmpty()
|
||||
val value = "Hello!"
|
||||
assertThat(echo.echo(value)).isEqualTo(value)
|
||||
assertThat(firstAdvisor.interceptor.invocations).singleElement().matches { String::class.java.isAssignableFrom(it) }
|
||||
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { String::class.java.isAssignableFrom(it) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Multiple interceptors with suspending function`() {
|
||||
assertThat(firstAdvisor.interceptor.invocations).isEmpty()
|
||||
assertThat(secondAdvisor.interceptor.invocations).isEmpty()
|
||||
val value = "Hello!"
|
||||
runBlocking {
|
||||
assertThat(echo.suspendingEcho(value)).isEqualTo(value)
|
||||
}
|
||||
assertThat(firstAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
|
||||
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAspectJAutoProxy
|
||||
open class InterceptorConfig {
|
||||
|
||||
@Bean
|
||||
open fun firstAdvisor() = TestPointcutAdvisor().apply { order = 0 }
|
||||
|
||||
@Bean
|
||||
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }
|
||||
|
||||
|
||||
@Bean
|
||||
open fun echo(): Echo {
|
||||
return Echo()
|
||||
}
|
||||
}
|
||||
|
||||
class TestMethodInterceptor: MethodInterceptor {
|
||||
|
||||
var invocations: MutableList<Class<*>> = mutableListOf()
|
||||
|
||||
@Suppress("RedundantNullableReturnType")
|
||||
override fun invoke(invocation: MethodInvocation): Any? {
|
||||
val result = invocation.proceed()
|
||||
invocations.add(result!!.javaClass)
|
||||
return result
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class TestPointcutAdvisor : StaticMethodMatcherPointcutAdvisor(TestMethodInterceptor()) {
|
||||
|
||||
val interceptor: TestMethodInterceptor
|
||||
get() = advice as TestMethodInterceptor
|
||||
|
||||
override fun matches(method: Method, targetClass: Class<*>): Boolean {
|
||||
return targetClass == Echo::class.java && method.name.lowercase().endsWith("echo")
|
||||
}
|
||||
}
|
||||
|
||||
open class Echo {
|
||||
|
||||
open fun echo(value: String): String {
|
||||
return value;
|
||||
}
|
||||
|
||||
open suspend fun suspendingEcho(value: String): String {
|
||||
delay(1)
|
||||
return value;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,14 +1,18 @@
|
|||
description = "Spring AOP"
|
||||
|
||||
apply plugin: "kotlin"
|
||||
|
||||
dependencies {
|
||||
api(project(":spring-beans"))
|
||||
api(project(":spring-core"))
|
||||
optional("org.apache.commons:commons-pool2")
|
||||
optional("org.aspectj:aspectjweaver")
|
||||
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
|
||||
testFixturesImplementation(testFixtures(project(":spring-beans")))
|
||||
testFixturesImplementation(testFixtures(project(":spring-core")))
|
||||
testFixturesImplementation("com.google.code.findbugs:jsr305")
|
||||
testImplementation(project(":spring-core-test"))
|
||||
testImplementation(testFixtures(project(":spring-beans")))
|
||||
testImplementation(testFixtures(project(":spring-core")))
|
||||
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.springframework.cglib.proxy.MethodInterceptor;
|
|||
import org.springframework.cglib.proxy.MethodProxy;
|
||||
import org.springframework.cglib.proxy.NoOp;
|
||||
import org.springframework.core.KotlinDetector;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.SmartClassLoader;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
@ -75,6 +76,7 @@ import org.springframework.util.ReflectionUtils;
|
|||
* @author Ramnivas Laddad
|
||||
* @author Chris Beams
|
||||
* @author Dave Syer
|
||||
* @author Sebastien Deleuze
|
||||
* @see org.springframework.cglib.proxy.Enhancer
|
||||
* @see AdvisedSupport#setProxyTargetClass
|
||||
* @see DefaultAopProxyFactory
|
||||
|
@ -98,6 +100,8 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
/** Keeps track of the Classes that we have validated for final methods. */
|
||||
private static final Map<Class<?>, Boolean> validatedClasses = new WeakHashMap<>();
|
||||
|
||||
private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
|
||||
|
||||
|
||||
/** The configuration used to configure this proxy. */
|
||||
protected final AdvisedSupport advised;
|
||||
|
@ -399,10 +403,11 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
/**
|
||||
* Process a return value. Wraps a return of {@code this} if necessary to be the
|
||||
* {@code proxy} and also verifies that {@code null} is not returned as a primitive.
|
||||
* Also takes care of the conversion from {@code Mono} to Kotlin Coroutines if needed.
|
||||
*/
|
||||
@Nullable
|
||||
private static Object processReturnType(
|
||||
Object proxy, @Nullable Object target, Method method, @Nullable Object returnValue) {
|
||||
Object proxy, @Nullable Object target, Method method, Object[] arguments, @Nullable Object returnValue) {
|
||||
|
||||
// Massage return value if necessary
|
||||
if (returnValue != null && returnValue == target &&
|
||||
|
@ -416,6 +421,11 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
throw new AopInvocationException(
|
||||
"Null return value from advice does not match primitive return type for: " + method);
|
||||
}
|
||||
if (KotlinDetector.isSuspendingFunction(method)) {
|
||||
return COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()) ?
|
||||
CoroutinesUtils.asFlow(returnValue) :
|
||||
CoroutinesUtils.awaitSingleOrNull(returnValue, arguments[arguments.length - 1]);
|
||||
}
|
||||
return returnValue;
|
||||
}
|
||||
|
||||
|
@ -446,7 +456,7 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
@Nullable
|
||||
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
|
||||
Object retVal = AopUtils.invokeJoinpointUsingReflection(this.target, method, args);
|
||||
return processReturnType(proxy, this.target, method, retVal);
|
||||
return processReturnType(proxy, this.target, method, args, retVal);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -471,7 +481,7 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
try {
|
||||
oldProxy = AopContext.setCurrentProxy(proxy);
|
||||
Object retVal = AopUtils.invokeJoinpointUsingReflection(this.target, method, args);
|
||||
return processReturnType(proxy, this.target, method, retVal);
|
||||
return processReturnType(proxy, this.target, method, args, retVal);
|
||||
}
|
||||
finally {
|
||||
AopContext.setCurrentProxy(oldProxy);
|
||||
|
@ -499,7 +509,7 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
Object target = this.targetSource.getTarget();
|
||||
try {
|
||||
Object retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
|
||||
return processReturnType(proxy, target, method, retVal);
|
||||
return processReturnType(proxy, target, method, args, retVal);
|
||||
}
|
||||
finally {
|
||||
if (target != null) {
|
||||
|
@ -529,7 +539,7 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
try {
|
||||
oldProxy = AopContext.setCurrentProxy(proxy);
|
||||
Object retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
|
||||
return processReturnType(proxy, target, method, retVal);
|
||||
return processReturnType(proxy, target, method, args, retVal);
|
||||
}
|
||||
finally {
|
||||
AopContext.setCurrentProxy(oldProxy);
|
||||
|
@ -656,7 +666,7 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
proxy, this.target, method, args, this.targetClass, this.adviceChain, methodProxy);
|
||||
// If we get here, we need to create a MethodInvocation.
|
||||
Object retVal = invocation.proceed();
|
||||
retVal = processReturnType(proxy, this.target, method, retVal);
|
||||
retVal = processReturnType(proxy, this.target, method, args, retVal);
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
@ -706,7 +716,7 @@ class CglibAopProxy implements AopProxy, Serializable {
|
|||
// We need to create a method invocation...
|
||||
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
|
||||
}
|
||||
return processReturnType(proxy, target, method, retVal);
|
||||
return processReturnType(proxy, target, method, args, retVal);
|
||||
}
|
||||
finally {
|
||||
if (target != null && !targetSource.isStatic()) {
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.aop.framework;
|
||||
|
||||
import kotlin.coroutines.Continuation;
|
||||
import kotlinx.coroutines.reactive.ReactiveFlowKt;
|
||||
import kotlinx.coroutines.reactor.MonoKt;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* Package-visible class designed to avoid a hard dependency on Kotlin and Coroutines dependency at runtime.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @since 6.1.0
|
||||
*/
|
||||
abstract class CoroutinesUtils {
|
||||
|
||||
static Object asFlow(Object publisher) {
|
||||
return ReactiveFlowKt.asFlow((Publisher<?>) publisher);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
static Object awaitSingleOrNull(Object mono, Object continuation) {
|
||||
return MonoKt.awaitSingleOrNull((Mono<?>) mono, (Continuation<Object>) continuation);
|
||||
}
|
||||
|
||||
}
|
|
@ -31,6 +31,8 @@ import org.springframework.aop.RawTargetAccess;
|
|||
import org.springframework.aop.TargetSource;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.core.DecoratingProxy;
|
||||
import org.springframework.core.KotlinDetector;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
@ -58,6 +60,7 @@ import org.springframework.util.ClassUtils;
|
|||
* @author Rob Harrop
|
||||
* @author Dave Syer
|
||||
* @author Sergey Tsypanov
|
||||
* @author Sebastien Deleuze
|
||||
* @see java.lang.reflect.Proxy
|
||||
* @see AdvisedSupport
|
||||
* @see ProxyFactory
|
||||
|
@ -80,6 +83,8 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa
|
|||
/** We use a static Log to avoid serialization issues. */
|
||||
private static final Log logger = LogFactory.getLog(JdkDynamicAopProxy.class);
|
||||
|
||||
private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
|
||||
|
||||
/** Config used to configure this proxy. */
|
||||
private final AdvisedSupport advised;
|
||||
|
||||
|
@ -258,6 +263,10 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa
|
|||
throw new AopInvocationException(
|
||||
"Null return value from advice does not match primitive return type for: " + method);
|
||||
}
|
||||
if (KotlinDetector.isSuspendingFunction(method)) {
|
||||
return COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()) ?
|
||||
CoroutinesUtils.asFlow(retVal) : CoroutinesUtils.awaitSingleOrNull(retVal, args[args.length - 1]);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
finally {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2022 the original author or authors.
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -25,6 +25,11 @@ import java.util.LinkedHashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import kotlin.coroutines.Continuation;
|
||||
import kotlin.coroutines.CoroutineContext;
|
||||
import kotlinx.coroutines.Job;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.aop.Advisor;
|
||||
import org.springframework.aop.AopInvocationException;
|
||||
import org.springframework.aop.IntroductionAdvisor;
|
||||
|
@ -35,6 +40,8 @@ import org.springframework.aop.PointcutAdvisor;
|
|||
import org.springframework.aop.SpringProxy;
|
||||
import org.springframework.aop.TargetClassAware;
|
||||
import org.springframework.core.BridgeMethodResolver;
|
||||
import org.springframework.core.CoroutinesUtils;
|
||||
import org.springframework.core.KotlinDetector;
|
||||
import org.springframework.core.MethodIntrospector;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
@ -53,6 +60,7 @@ import org.springframework.util.ReflectionUtils;
|
|||
* @author Rod Johnson
|
||||
* @author Juergen Hoeller
|
||||
* @author Rob Harrop
|
||||
* @author Sebastien Deleuze
|
||||
* @see org.springframework.aop.framework.AopProxyUtils
|
||||
*/
|
||||
public abstract class AopUtils {
|
||||
|
@ -340,7 +348,8 @@ public abstract class AopUtils {
|
|||
// Use reflection to invoke the method.
|
||||
try {
|
||||
ReflectionUtils.makeAccessible(method);
|
||||
return method.invoke(target, args);
|
||||
return KotlinDetector.isSuspendingFunction(method) ?
|
||||
KotlinDelegate.invokeSuspendingFunction(method, target, args) : method.invoke(target, args);
|
||||
}
|
||||
catch (InvocationTargetException ex) {
|
||||
// Invoked method threw a checked exception.
|
||||
|
@ -356,4 +365,17 @@ public abstract class AopUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner class to avoid a hard dependency on Kotlin at runtime.
|
||||
*/
|
||||
private static class KotlinDelegate {
|
||||
|
||||
public static Publisher<?> invokeSuspendingFunction(Method method, Object target, Object... args) {
|
||||
Continuation<?> continuation = (Continuation<?>) args[args.length -1];
|
||||
CoroutineContext context = continuation.getContext().minusKey(Job.Key);
|
||||
return CoroutinesUtils.invokeSuspendingFunction(context, method, target, args);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -29,12 +29,14 @@ import org.springframework.aop.testfixture.interceptor.NopInterceptor;
|
|||
import org.springframework.beans.testfixture.beans.TestBean;
|
||||
import org.springframework.core.testfixture.io.SerializationTestUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Rod Johnson
|
||||
* @author Chris Beams
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public class AopUtilsTests {
|
||||
|
||||
|
@ -88,4 +90,13 @@ public class AopUtilsTests {
|
|||
assertThat(SerializationTestUtils.serializeAndDeserialize(ExposeInvocationInterceptor.INSTANCE)).isSameAs(ExposeInvocationInterceptor.INSTANCE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeJoinpointUsingReflection() throws Throwable {
|
||||
String name = "foo";
|
||||
TestBean testBean = new TestBean(name);
|
||||
Method method = ReflectionUtils.findMethod(TestBean.class, "getName");
|
||||
Object result = AopUtils.invokeJoinpointUsingReflection(testBean, method, new Object[0]);
|
||||
assertThat(result).isEqualTo(name);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.aop.support
|
||||
|
||||
import kotlinx.coroutines.CoroutineName
|
||||
import kotlinx.coroutines.delay
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.springframework.util.ReflectionUtils
|
||||
import reactor.core.publisher.Mono
|
||||
import kotlin.coroutines.Continuation
|
||||
|
||||
/**
|
||||
* Tests for Kotlin support in [AopUtils].
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
class AopUtilsKotlinTests {
|
||||
|
||||
@Test
|
||||
fun `Invoking suspending function should return Mono`() {
|
||||
val value = "foo"
|
||||
val method = ReflectionUtils.findMethod(AopUtilsKotlinTests::class.java, "suspendingFunction",
|
||||
String::class.java, Continuation::class.java)!!
|
||||
val continuation = Continuation<Any>(CoroutineName("test")) { }
|
||||
val result = AopUtils.invokeJoinpointUsingReflection(this, method, arrayOf(value, continuation))
|
||||
assertThat(result).isInstanceOfSatisfying(Mono::class.java) {
|
||||
assertThat(it.block()).isEqualTo(value)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
suspend fun suspendingFunction(value: String): String {
|
||||
delay(1)
|
||||
return value;
|
||||
}
|
||||
|
||||
}
|
|
@ -26,8 +26,6 @@ import io.vavr.control.Try;
|
|||
import kotlin.coroutines.Continuation;
|
||||
import kotlin.coroutines.CoroutineContext;
|
||||
import kotlinx.coroutines.Job;
|
||||
import kotlinx.coroutines.reactive.ReactiveFlowKt;
|
||||
import kotlinx.coroutines.reactor.MonoKt;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
@ -370,12 +368,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
if (corInv != null) {
|
||||
callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv);
|
||||
}
|
||||
Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
|
||||
if (corInv != null) {
|
||||
return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow((Publisher<?>) result) :
|
||||
KotlinDelegate.awaitSingleOrNull((Mono<?>) result, corInv.getContinuation()));
|
||||
}
|
||||
return result;
|
||||
return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
|
||||
}
|
||||
|
||||
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
|
||||
|
@ -896,16 +889,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
*/
|
||||
private static class KotlinDelegate {
|
||||
|
||||
private static Object asFlow(Publisher<?> publisher) {
|
||||
return ReactiveFlowKt.asFlow(publisher);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
private static Object awaitSingleOrNull(Mono<?> publisher, Object continuation) {
|
||||
return MonoKt.awaitSingleOrNull(publisher, (Continuation<Object>) continuation);
|
||||
}
|
||||
|
||||
public static Publisher<?> invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) {
|
||||
CoroutineContext coroutineContext = ((Continuation<?>) callback.getContinuation()).getContext().minusKey(Job.Key);
|
||||
return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments());
|
||||
|
|
|
@ -25,11 +25,8 @@ import java.util.Map;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import kotlin.coroutines.Continuation;
|
||||
import kotlinx.coroutines.reactor.MonoKt;
|
||||
import org.aopalliance.intercept.MethodInterceptor;
|
||||
import org.aopalliance.intercept.MethodInvocation;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.aop.framework.ProxyFactory;
|
||||
import org.springframework.aop.framework.ReflectiveMethodInvocation;
|
||||
|
@ -301,10 +298,9 @@ public final class HttpServiceProxyFactory {
|
|||
Method method = invocation.getMethod();
|
||||
HttpServiceMethod httpServiceMethod = this.httpServiceMethods.get(method);
|
||||
if (httpServiceMethod != null) {
|
||||
if (KotlinDetector.isSuspendingFunction(method)) {
|
||||
return KotlinDelegate.invokeSuspendingFunction(invocation, httpServiceMethod);
|
||||
}
|
||||
return httpServiceMethod.invoke(invocation.getArguments());
|
||||
Object[] arguments = KotlinDetector.isSuspendingFunction(method) ?
|
||||
resolveCoroutinesArguments(invocation.getArguments()) : invocation.getArguments();
|
||||
return httpServiceMethod.invoke(arguments);
|
||||
}
|
||||
if (method.isDefault()) {
|
||||
if (invocation instanceof ReflectiveMethodInvocation reflectiveMethodInvocation) {
|
||||
|
@ -314,27 +310,13 @@ public final class HttpServiceProxyFactory {
|
|||
}
|
||||
throw new IllegalStateException("Unexpected method invocation: " + method);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner class to avoid a hard dependency on Kotlin at runtime.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static class KotlinDelegate {
|
||||
|
||||
public static Object invokeSuspendingFunction(MethodInvocation invocation, HttpServiceMethod httpServiceMethod) {
|
||||
Object[] rawArguments = invocation.getArguments();
|
||||
Object[] arguments = resolveArguments(rawArguments);
|
||||
Continuation<Object> continuation = (Continuation<Object>) rawArguments[rawArguments.length - 1];
|
||||
Mono<Object> wrapped = (Mono<Object>) httpServiceMethod.invoke(arguments);
|
||||
return MonoKt.awaitSingleOrNull(wrapped, continuation);
|
||||
}
|
||||
|
||||
private static Object[] resolveArguments(Object[] args) {
|
||||
private static Object[] resolveCoroutinesArguments(Object[] args) {
|
||||
Object[] functionArgs = new Object[args.length - 1];
|
||||
System.arraycopy(args, 0, functionArgs, 0, args.length - 1);
|
||||
return functionArgs;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue