diff --git a/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt b/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt index cf535ad9f4e..47fbba4213d 100644 --- a/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt +++ b/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt @@ -20,6 +20,9 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.aopalliance.intercept.MethodInterceptor import org.aopalliance.intercept.MethodInvocation +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.Around +import org.aspectj.lang.annotation.Aspect import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig @@ -28,10 +31,18 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.EnableAspectJAutoProxy +import org.springframework.stereotype.Component import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import org.springframework.transaction.annotation.EnableTransactionManagement +import org.springframework.transaction.annotation.Transactional +import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager import reactor.core.publisher.Mono import java.lang.reflect.Method +import kotlin.annotation.AnnotationTarget.ANNOTATION_CLASS +import kotlin.annotation.AnnotationTarget.CLASS +import kotlin.annotation.AnnotationTarget.FUNCTION +import kotlin.annotation.AnnotationTarget.TYPE /** @@ -43,7 +54,9 @@ import java.lang.reflect.Method class AspectJAutoProxyInterceptorKotlinIntegrationTests( @Autowired val echo: Echo, @Autowired val firstAdvisor: TestPointcutAdvisor, - @Autowired val secondAdvisor: TestPointcutAdvisor) { + @Autowired val secondAdvisor: TestPointcutAdvisor, + @Autowired val countingAspect: CountingAspect, + @Autowired val reactiveTransactionManager: ReactiveCallCountingTransactionManager) { @Test fun `Multiple interceptors with regular function`() { @@ -67,8 +80,22 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) } } + @Test // gh-33095 + fun `Aspect and reactive transactional with suspending function`() { + assertThat(countingAspect.counter).isZero() + assertThat(reactiveTransactionManager.commits).isZero() + val value = "Hello!" + runBlocking { + assertThat(echo.suspendingTransactionalEcho(value)).isEqualTo(value) + } + assertThat(countingAspect.counter).`as`("aspect applied").isOne() + assertThat(reactiveTransactionManager.begun).isOne() + assertThat(reactiveTransactionManager.commits).`as`("transactional applied").isOne() + } + @Configuration @EnableAspectJAutoProxy + @EnableTransactionManagement open class InterceptorConfig { @Bean @@ -77,6 +104,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( @Bean open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 } + @Bean + open fun countingAspect() = CountingAspect() + + @Bean + open fun transactionManager(): ReactiveCallCountingTransactionManager { + return ReactiveCallCountingTransactionManager() + } @Bean open fun echo(): Echo { @@ -107,6 +141,24 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( } } + @Target(CLASS, FUNCTION, ANNOTATION_CLASS, TYPE) + @Retention(AnnotationRetention.RUNTIME) + annotation class Counting() + + @Aspect + @Component + class CountingAspect { + + var counter: Long = 0 + + @Around("@annotation(org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.Counting)") + fun logging(joinPoint: ProceedingJoinPoint): Any { + return (joinPoint.proceed(joinPoint.args) as Mono<*>).doOnTerminate { + counter++ + } + } + } + open class Echo { open fun echo(value: String): String { @@ -118,6 +170,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( return value } + @Transactional + @Counting + open suspend fun suspendingTransactionalEcho(value: String): String { + delay(1) + return value + } + } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index a02fd664cf9..c644c976f56 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -23,12 +23,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import io.vavr.control.Try; -import kotlin.coroutines.Continuation; -import kotlin.coroutines.CoroutineContext; -import kotlinx.coroutines.Job; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -37,7 +33,6 @@ import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; -import org.springframework.core.CoroutinesUtils; import org.springframework.core.KotlinDetector; import org.springframework.core.MethodParameter; import org.springframework.core.NamedThreadLocal; @@ -356,10 +351,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method); boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()); - if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) { - throw new IllegalStateException("Coroutines invocation not supported: " + method); - } - CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null); ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { Class reactiveType = @@ -372,11 +363,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init return new ReactiveTransactionSupport(adapter); }); - InvocationCallback callback = invocation; - if (corInv != null) { - callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv); - } - return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm); + return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, rtm); } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); @@ -864,22 +851,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init } - /** - * Coroutines-supporting extension of the callback interface. - */ - protected interface CoroutinesInvocationCallback extends InvocationCallback { - - Object getTarget(); - - Object[] getArguments(); - - default Object getContinuation() { - Object[] args = getArguments(); - return args[args.length - 1]; - } - } - - /** * Internal holder class for a Throwable in a callback transaction model. */ @@ -928,18 +899,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init } } - /** - * Inner class to avoid a hard dependency on Kotlin at runtime. - */ - private static class KotlinDelegate { - - public static Publisher invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) { - CoroutineContext coroutineContext = ((Continuation) callback.getContinuation()).getContext().minusKey(Job.Key); - return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments()); - } - - } - /** * Delegate for Reactor-based management of transactional methods with a diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java index 788c1f25199..863c23506cd 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java @@ -116,21 +116,7 @@ public class TransactionInterceptor extends TransactionAspectSupport implements Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... - return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { - @Override - @Nullable - public Object proceedWithInvocation() throws Throwable { - return invocation.proceed(); - } - @Override - public Object getTarget() { - return invocation.getThis(); - } - @Override - public Object[] getArguments() { - return invocation.getArguments(); - } - }); + return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }