Make coroutines with custom AOP aspects work with `@Transactional`
Previous to this change, the transactional aspect would supersed the
user-defined AspectJ aspect, shortcircuiting to calling the original
Kotlin suspending function.
This change simplifies the TransactionAspectSupport way of dealing with
transactional coroutines, thanks to the fact that lower level support
for AOP has been introduced in c8169e5c.
Closes gh-33095
This commit is contained in:
parent
3ccaefe38f
commit
1d890a8952
|
|
@ -20,6 +20,9 @@ import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import org.aopalliance.intercept.MethodInterceptor
|
import org.aopalliance.intercept.MethodInterceptor
|
||||||
import org.aopalliance.intercept.MethodInvocation
|
import org.aopalliance.intercept.MethodInvocation
|
||||||
|
import org.aspectj.lang.ProceedingJoinPoint
|
||||||
|
import org.aspectj.lang.annotation.Around
|
||||||
|
import org.aspectj.lang.annotation.Aspect
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
|
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
|
||||||
|
|
@ -28,10 +31,18 @@ import org.springframework.beans.factory.annotation.Autowired
|
||||||
import org.springframework.context.annotation.Bean
|
import org.springframework.context.annotation.Bean
|
||||||
import org.springframework.context.annotation.Configuration
|
import org.springframework.context.annotation.Configuration
|
||||||
import org.springframework.context.annotation.EnableAspectJAutoProxy
|
import org.springframework.context.annotation.EnableAspectJAutoProxy
|
||||||
|
import org.springframework.stereotype.Component
|
||||||
import org.springframework.test.annotation.DirtiesContext
|
import org.springframework.test.annotation.DirtiesContext
|
||||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
|
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
|
||||||
|
import org.springframework.transaction.annotation.EnableTransactionManagement
|
||||||
|
import org.springframework.transaction.annotation.Transactional
|
||||||
|
import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager
|
||||||
import reactor.core.publisher.Mono
|
import reactor.core.publisher.Mono
|
||||||
import java.lang.reflect.Method
|
import java.lang.reflect.Method
|
||||||
|
import kotlin.annotation.AnnotationTarget.ANNOTATION_CLASS
|
||||||
|
import kotlin.annotation.AnnotationTarget.CLASS
|
||||||
|
import kotlin.annotation.AnnotationTarget.FUNCTION
|
||||||
|
import kotlin.annotation.AnnotationTarget.TYPE
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -43,7 +54,9 @@ import java.lang.reflect.Method
|
||||||
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||||
@Autowired val echo: Echo,
|
@Autowired val echo: Echo,
|
||||||
@Autowired val firstAdvisor: TestPointcutAdvisor,
|
@Autowired val firstAdvisor: TestPointcutAdvisor,
|
||||||
@Autowired val secondAdvisor: TestPointcutAdvisor) {
|
@Autowired val secondAdvisor: TestPointcutAdvisor,
|
||||||
|
@Autowired val countingAspect: CountingAspect,
|
||||||
|
@Autowired val reactiveTransactionManager: ReactiveCallCountingTransactionManager) {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `Multiple interceptors with regular function`() {
|
fun `Multiple interceptors with regular function`() {
|
||||||
|
|
@ -67,8 +80,22 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||||
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
|
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test // gh-33095
|
||||||
|
fun `Aspect and reactive transactional with suspending function`() {
|
||||||
|
assertThat(countingAspect.counter).isZero()
|
||||||
|
assertThat(reactiveTransactionManager.commits).isZero()
|
||||||
|
val value = "Hello!"
|
||||||
|
runBlocking {
|
||||||
|
assertThat(echo.suspendingTransactionalEcho(value)).isEqualTo(value)
|
||||||
|
}
|
||||||
|
assertThat(countingAspect.counter).`as`("aspect applied").isOne()
|
||||||
|
assertThat(reactiveTransactionManager.begun).isOne()
|
||||||
|
assertThat(reactiveTransactionManager.commits).`as`("transactional applied").isOne()
|
||||||
|
}
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableAspectJAutoProxy
|
@EnableAspectJAutoProxy
|
||||||
|
@EnableTransactionManagement
|
||||||
open class InterceptorConfig {
|
open class InterceptorConfig {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|
@ -77,6 +104,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||||
@Bean
|
@Bean
|
||||||
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }
|
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
open fun countingAspect() = CountingAspect()
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
open fun transactionManager(): ReactiveCallCountingTransactionManager {
|
||||||
|
return ReactiveCallCountingTransactionManager()
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
open fun echo(): Echo {
|
open fun echo(): Echo {
|
||||||
|
|
@ -107,6 +141,24 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Target(CLASS, FUNCTION, ANNOTATION_CLASS, TYPE)
|
||||||
|
@Retention(AnnotationRetention.RUNTIME)
|
||||||
|
annotation class Counting()
|
||||||
|
|
||||||
|
@Aspect
|
||||||
|
@Component
|
||||||
|
class CountingAspect {
|
||||||
|
|
||||||
|
var counter: Long = 0
|
||||||
|
|
||||||
|
@Around("@annotation(org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.Counting)")
|
||||||
|
fun logging(joinPoint: ProceedingJoinPoint): Any {
|
||||||
|
return (joinPoint.proceed(joinPoint.args) as Mono<*>).doOnTerminate {
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
open class Echo {
|
open class Echo {
|
||||||
|
|
||||||
open fun echo(value: String): String {
|
open fun echo(value: String): String {
|
||||||
|
|
@ -118,6 +170,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||||
return value
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
@Counting
|
||||||
|
open suspend fun suspendingTransactionalEcho(value: String): String {
|
||||||
|
delay(1)
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,12 +23,8 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import io.vavr.control.Try;
|
import io.vavr.control.Try;
|
||||||
import kotlin.coroutines.Continuation;
|
|
||||||
import kotlin.coroutines.CoroutineContext;
|
|
||||||
import kotlinx.coroutines.Job;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.reactivestreams.Publisher;
|
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
|
@ -36,7 +32,6 @@ import org.springframework.beans.factory.BeanFactory;
|
||||||
import org.springframework.beans.factory.BeanFactoryAware;
|
import org.springframework.beans.factory.BeanFactoryAware;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||||
import org.springframework.core.CoroutinesUtils;
|
|
||||||
import org.springframework.core.KotlinDetector;
|
import org.springframework.core.KotlinDetector;
|
||||||
import org.springframework.core.MethodParameter;
|
import org.springframework.core.MethodParameter;
|
||||||
import org.springframework.core.NamedThreadLocal;
|
import org.springframework.core.NamedThreadLocal;
|
||||||
|
|
@ -355,10 +350,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
|
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
|
||||||
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
|
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
|
||||||
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
|
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
|
||||||
if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
|
|
||||||
throw new IllegalStateException("Coroutines invocation not supported: " + method);
|
|
||||||
}
|
|
||||||
CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);
|
|
||||||
|
|
||||||
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
|
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
|
||||||
Class<?> reactiveType =
|
Class<?> reactiveType =
|
||||||
|
|
@ -371,11 +362,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
return new ReactiveTransactionSupport(adapter);
|
return new ReactiveTransactionSupport(adapter);
|
||||||
});
|
});
|
||||||
|
|
||||||
InvocationCallback callback = invocation;
|
return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, rtm);
|
||||||
if (corInv != null) {
|
|
||||||
callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv);
|
|
||||||
}
|
|
||||||
return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
|
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
|
||||||
|
|
@ -829,22 +816,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Coroutines-supporting extension of the callback interface.
|
|
||||||
*/
|
|
||||||
protected interface CoroutinesInvocationCallback extends InvocationCallback {
|
|
||||||
|
|
||||||
Object getTarget();
|
|
||||||
|
|
||||||
Object[] getArguments();
|
|
||||||
|
|
||||||
default Object getContinuation() {
|
|
||||||
Object[] args = getArguments();
|
|
||||||
return args[args.length - 1];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal holder class for a Throwable in a callback transaction model.
|
* Internal holder class for a Throwable in a callback transaction model.
|
||||||
*/
|
*/
|
||||||
|
|
@ -891,18 +862,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Inner class to avoid a hard dependency on Kotlin at runtime.
|
|
||||||
*/
|
|
||||||
private static class KotlinDelegate {
|
|
||||||
|
|
||||||
public static Publisher<?> invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) {
|
|
||||||
CoroutineContext coroutineContext = ((Continuation<?>) callback.getContinuation()).getContext().minusKey(Job.Key);
|
|
||||||
return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delegate for Reactor-based management of transactional methods with a
|
* Delegate for Reactor-based management of transactional methods with a
|
||||||
|
|
|
||||||
|
|
@ -116,21 +116,7 @@ public class TransactionInterceptor extends TransactionAspectSupport implements
|
||||||
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
|
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
|
||||||
|
|
||||||
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
|
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
|
||||||
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
|
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
|
||||||
@Override
|
|
||||||
@Nullable
|
|
||||||
public Object proceedWithInvocation() throws Throwable {
|
|
||||||
return invocation.proceed();
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public Object getTarget() {
|
|
||||||
return invocation.getThis();
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public Object[] getArguments() {
|
|
||||||
return invocation.getArguments();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue