Merge branch '6.1.x'
This commit is contained in:
commit
0c319a89d7
|
@ -20,6 +20,9 @@ import kotlinx.coroutines.delay
|
|||
import kotlinx.coroutines.runBlocking
|
||||
import org.aopalliance.intercept.MethodInterceptor
|
||||
import org.aopalliance.intercept.MethodInvocation
|
||||
import org.aspectj.lang.ProceedingJoinPoint
|
||||
import org.aspectj.lang.annotation.Around
|
||||
import org.aspectj.lang.annotation.Aspect
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
|
||||
|
@ -28,10 +31,18 @@ import org.springframework.beans.factory.annotation.Autowired
|
|||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.context.annotation.EnableAspectJAutoProxy
|
||||
import org.springframework.stereotype.Component
|
||||
import org.springframework.test.annotation.DirtiesContext
|
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager
|
||||
import reactor.core.publisher.Mono
|
||||
import java.lang.reflect.Method
|
||||
import kotlin.annotation.AnnotationTarget.ANNOTATION_CLASS
|
||||
import kotlin.annotation.AnnotationTarget.CLASS
|
||||
import kotlin.annotation.AnnotationTarget.FUNCTION
|
||||
import kotlin.annotation.AnnotationTarget.TYPE
|
||||
|
||||
|
||||
/**
|
||||
|
@ -43,7 +54,9 @@ import java.lang.reflect.Method
|
|||
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
||||
@Autowired val echo: Echo,
|
||||
@Autowired val firstAdvisor: TestPointcutAdvisor,
|
||||
@Autowired val secondAdvisor: TestPointcutAdvisor) {
|
||||
@Autowired val secondAdvisor: TestPointcutAdvisor,
|
||||
@Autowired val countingAspect: CountingAspect,
|
||||
@Autowired val reactiveTransactionManager: ReactiveCallCountingTransactionManager) {
|
||||
|
||||
@Test
|
||||
fun `Multiple interceptors with regular function`() {
|
||||
|
@ -67,8 +80,22 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
|||
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
|
||||
}
|
||||
|
||||
@Test // gh-33095
|
||||
fun `Aspect and reactive transactional with suspending function`() {
|
||||
assertThat(countingAspect.counter).isZero()
|
||||
assertThat(reactiveTransactionManager.commits).isZero()
|
||||
val value = "Hello!"
|
||||
runBlocking {
|
||||
assertThat(echo.suspendingTransactionalEcho(value)).isEqualTo(value)
|
||||
}
|
||||
assertThat(countingAspect.counter).`as`("aspect applied").isOne()
|
||||
assertThat(reactiveTransactionManager.begun).isOne()
|
||||
assertThat(reactiveTransactionManager.commits).`as`("transactional applied").isOne()
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAspectJAutoProxy
|
||||
@EnableTransactionManagement
|
||||
open class InterceptorConfig {
|
||||
|
||||
@Bean
|
||||
|
@ -77,6 +104,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
|||
@Bean
|
||||
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }
|
||||
|
||||
@Bean
|
||||
open fun countingAspect() = CountingAspect()
|
||||
|
||||
@Bean
|
||||
open fun transactionManager(): ReactiveCallCountingTransactionManager {
|
||||
return ReactiveCallCountingTransactionManager()
|
||||
}
|
||||
|
||||
@Bean
|
||||
open fun echo(): Echo {
|
||||
|
@ -107,6 +141,24 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
|||
}
|
||||
}
|
||||
|
||||
@Target(CLASS, FUNCTION, ANNOTATION_CLASS, TYPE)
|
||||
@Retention(AnnotationRetention.RUNTIME)
|
||||
annotation class Counting()
|
||||
|
||||
@Aspect
|
||||
@Component
|
||||
class CountingAspect {
|
||||
|
||||
var counter: Long = 0
|
||||
|
||||
@Around("@annotation(org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.Counting)")
|
||||
fun logging(joinPoint: ProceedingJoinPoint): Any {
|
||||
return (joinPoint.proceed(joinPoint.args) as Mono<*>).doOnTerminate {
|
||||
counter++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
open class Echo {
|
||||
|
||||
open fun echo(value: String): String {
|
||||
|
@ -118,6 +170,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
|
|||
return value
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Counting
|
||||
open suspend fun suspendingTransactionalEcho(value: String): String {
|
||||
delay(1)
|
||||
return value
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,12 +23,8 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.Future;
|
||||
|
||||
import io.vavr.control.Try;
|
||||
import kotlin.coroutines.Continuation;
|
||||
import kotlin.coroutines.CoroutineContext;
|
||||
import kotlinx.coroutines.Job;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
|
@ -37,7 +33,6 @@ import org.springframework.beans.factory.BeanFactoryAware;
|
|||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
|
||||
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||
import org.springframework.core.CoroutinesUtils;
|
||||
import org.springframework.core.KotlinDetector;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.NamedThreadLocal;
|
||||
|
@ -356,10 +351,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
|
||||
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
|
||||
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
|
||||
if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
|
||||
throw new IllegalStateException("Coroutines invocation not supported: " + method);
|
||||
}
|
||||
CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);
|
||||
|
||||
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
|
||||
Class<?> reactiveType =
|
||||
|
@ -372,11 +363,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
return new ReactiveTransactionSupport(adapter);
|
||||
});
|
||||
|
||||
InvocationCallback callback = invocation;
|
||||
if (corInv != null) {
|
||||
callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv);
|
||||
}
|
||||
return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
|
||||
return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, rtm);
|
||||
}
|
||||
|
||||
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
|
||||
|
@ -864,22 +851,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Coroutines-supporting extension of the callback interface.
|
||||
*/
|
||||
protected interface CoroutinesInvocationCallback extends InvocationCallback {
|
||||
|
||||
Object getTarget();
|
||||
|
||||
Object[] getArguments();
|
||||
|
||||
default Object getContinuation() {
|
||||
Object[] args = getArguments();
|
||||
return args[args.length - 1];
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Internal holder class for a Throwable in a callback transaction model.
|
||||
*/
|
||||
|
@ -928,18 +899,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner class to avoid a hard dependency on Kotlin at runtime.
|
||||
*/
|
||||
private static class KotlinDelegate {
|
||||
|
||||
public static Publisher<?> invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) {
|
||||
CoroutineContext coroutineContext = ((Continuation<?>) callback.getContinuation()).getContext().minusKey(Job.Key);
|
||||
return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Delegate for Reactor-based management of transactional methods with a
|
||||
|
|
|
@ -116,21 +116,7 @@ public class TransactionInterceptor extends TransactionAspectSupport implements
|
|||
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
|
||||
|
||||
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
|
||||
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
|
||||
@Override
|
||||
@Nullable
|
||||
public Object proceedWithInvocation() throws Throwable {
|
||||
return invocation.proceed();
|
||||
}
|
||||
@Override
|
||||
public Object getTarget() {
|
||||
return invocation.getThis();
|
||||
}
|
||||
@Override
|
||||
public Object[] getArguments() {
|
||||
return invocation.getArguments();
|
||||
}
|
||||
});
|
||||
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue