Common code in AsyncExecutionAspectSupport allows for CompletableFuture processing with AspectJ as well
Issue: SPR-13128
This commit is contained in:
parent
e134e3e51b
commit
dd4bc630c3
|
@ -18,9 +18,13 @@ package org.springframework.aop.interceptor;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CompletionException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -31,9 +35,12 @@ import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||||
import org.springframework.core.task.AsyncListenableTaskExecutor;
|
import org.springframework.core.task.AsyncListenableTaskExecutor;
|
||||||
import org.springframework.core.task.AsyncTaskExecutor;
|
import org.springframework.core.task.AsyncTaskExecutor;
|
||||||
import org.springframework.core.task.support.TaskExecutorAdapter;
|
import org.springframework.core.task.support.TaskExecutorAdapter;
|
||||||
|
import org.springframework.lang.UsesJava8;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
import org.springframework.util.ClassUtils;
|
||||||
import org.springframework.util.ReflectionUtils;
|
import org.springframework.util.ReflectionUtils;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
import org.springframework.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for asynchronous method execution aspects, such as
|
* Base class for asynchronous method execution aspects, such as
|
||||||
|
@ -52,6 +59,11 @@ import org.springframework.util.StringUtils;
|
||||||
*/
|
*/
|
||||||
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
|
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
|
||||||
|
|
||||||
|
// Java 8's CompletableFuture type present?
|
||||||
|
private static final boolean completableFuturePresent = ClassUtils.isPresent(
|
||||||
|
"java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader());
|
||||||
|
|
||||||
|
|
||||||
protected final Log logger = LogFactory.getLog(getClass());
|
protected final Log logger = LogFactory.getLog(getClass());
|
||||||
|
|
||||||
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<Method, AsyncTaskExecutor>(16);
|
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<Method, AsyncTaskExecutor>(16);
|
||||||
|
@ -152,6 +164,32 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
|
||||||
*/
|
*/
|
||||||
protected abstract String getExecutorQualifier(Method method);
|
protected abstract String getExecutorQualifier(Method method);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delegate for actually executing the given task with the chosen executor.
|
||||||
|
* @param task the task to execute
|
||||||
|
* @param executor the chosen executor
|
||||||
|
* @param returnType the declared return type (potentially a {@link Future} variant)
|
||||||
|
* @return the execution result (potentially a corresponding {@link Future} handle)
|
||||||
|
*/
|
||||||
|
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
|
||||||
|
if (completableFuturePresent) {
|
||||||
|
Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
|
||||||
|
if (result != null) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ListenableFuture.class.isAssignableFrom(returnType)) {
|
||||||
|
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
|
||||||
|
}
|
||||||
|
else if (Future.class.isAssignableFrom(returnType)) {
|
||||||
|
return executor.submit(task);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
executor.submit(task);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles a fatal error thrown while asynchronously invoking the specified
|
* Handles a fatal error thrown while asynchronously invoking the specified
|
||||||
* {@link Method}.
|
* {@link Method}.
|
||||||
|
@ -180,4 +218,29 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inner class to avoid a hard dependency on Java 8.
|
||||||
|
*/
|
||||||
|
@UsesJava8
|
||||||
|
private static class CompletableFutureDelegate {
|
||||||
|
|
||||||
|
public static <T> Future<T> processCompletableFuture(Class<?> returnType, final Callable<T> task, Executor executor) {
|
||||||
|
if (!CompletableFuture.class.isAssignableFrom(returnType)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return CompletableFuture.supplyAsync(new Supplier<T>() {
|
||||||
|
@Override
|
||||||
|
public T get() {
|
||||||
|
try {
|
||||||
|
return task.call();
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
throw new CompletionException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, executor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,9 @@ package org.springframework.aop.interceptor;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.CompletionException;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
import org.aopalliance.intercept.MethodInterceptor;
|
import org.aopalliance.intercept.MethodInterceptor;
|
||||||
import org.aopalliance.intercept.MethodInvocation;
|
import org.aopalliance.intercept.MethodInvocation;
|
||||||
|
@ -31,11 +28,8 @@ import org.aopalliance.intercept.MethodInvocation;
|
||||||
import org.springframework.aop.support.AopUtils;
|
import org.springframework.aop.support.AopUtils;
|
||||||
import org.springframework.core.BridgeMethodResolver;
|
import org.springframework.core.BridgeMethodResolver;
|
||||||
import org.springframework.core.Ordered;
|
import org.springframework.core.Ordered;
|
||||||
import org.springframework.core.task.AsyncListenableTaskExecutor;
|
|
||||||
import org.springframework.core.task.AsyncTaskExecutor;
|
import org.springframework.core.task.AsyncTaskExecutor;
|
||||||
import org.springframework.lang.UsesJava8;
|
|
||||||
import org.springframework.util.ClassUtils;
|
import org.springframework.util.ClassUtils;
|
||||||
import org.springframework.util.concurrent.ListenableFuture;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AOP Alliance {@code MethodInterceptor} that processes method invocations
|
* AOP Alliance {@code MethodInterceptor} that processes method invocations
|
||||||
|
@ -69,13 +63,7 @@ import org.springframework.util.concurrent.ListenableFuture;
|
||||||
* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
|
* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
|
||||||
* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
|
* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
|
||||||
*/
|
*/
|
||||||
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport
|
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
|
||||||
implements MethodInterceptor, Ordered {
|
|
||||||
|
|
||||||
// Java 8's CompletableFuture type present?
|
|
||||||
private static final boolean completableFuturePresent = ClassUtils.isPresent(
|
|
||||||
"java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader());
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@code AsyncExecutionInterceptor}.
|
* Create a new {@code AsyncExecutionInterceptor}.
|
||||||
|
@ -132,23 +120,7 @@ public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Class<?> returnType = invocation.getMethod().getReturnType();
|
return doSubmit(task, executor, invocation.getMethod().getReturnType());
|
||||||
if (completableFuturePresent) {
|
|
||||||
Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
|
|
||||||
if (result != null) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ListenableFuture.class.isAssignableFrom(returnType)) {
|
|
||||||
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
|
|
||||||
}
|
|
||||||
else if (Future.class.isAssignableFrom(returnType)) {
|
|
||||||
return executor.submit(task);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
executor.submit(task);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -169,29 +141,4 @@ public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport
|
||||||
return Ordered.HIGHEST_PRECEDENCE;
|
return Ordered.HIGHEST_PRECEDENCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Inner class to avoid a hard dependency on Java 8.
|
|
||||||
*/
|
|
||||||
@UsesJava8
|
|
||||||
private static class CompletableFutureDelegate {
|
|
||||||
|
|
||||||
public static <T> Future<T> processCompletableFuture(Class<?> returnType, final Callable<T> task, Executor executor) {
|
|
||||||
if (!CompletableFuture.class.isAssignableFrom(returnType)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return CompletableFuture.supplyAsync(new Supplier<T>() {
|
|
||||||
@Override
|
|
||||||
public T get() {
|
|
||||||
try {
|
|
||||||
return task.call();
|
|
||||||
}
|
|
||||||
catch (Throwable ex) {
|
|
||||||
throw new CompletionException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, executor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2014 the original author or authors.
|
* Copyright 2002-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -62,11 +62,13 @@ public abstract aspect AbstractAsyncExecutionAspect extends AsyncExecutionAspect
|
||||||
@SuppressAjWarnings("adviceDidNotMatch")
|
@SuppressAjWarnings("adviceDidNotMatch")
|
||||||
Object around() : asyncMethod() {
|
Object around() : asyncMethod() {
|
||||||
final MethodSignature methodSignature = (MethodSignature) thisJoinPointStaticPart.getSignature();
|
final MethodSignature methodSignature = (MethodSignature) thisJoinPointStaticPart.getSignature();
|
||||||
|
|
||||||
AsyncTaskExecutor executor = determineAsyncExecutor(methodSignature.getMethod());
|
AsyncTaskExecutor executor = determineAsyncExecutor(methodSignature.getMethod());
|
||||||
if (executor == null) {
|
if (executor == null) {
|
||||||
return proceed();
|
return proceed();
|
||||||
}
|
}
|
||||||
Callable<Object> callable = new Callable<Object>() {
|
|
||||||
|
Callable<Object> task = new Callable<Object>() {
|
||||||
public Object call() throws Exception {
|
public Object call() throws Exception {
|
||||||
try {
|
try {
|
||||||
Object result = proceed();
|
Object result = proceed();
|
||||||
|
@ -80,17 +82,7 @@ public abstract aspect AbstractAsyncExecutionAspect extends AsyncExecutionAspect
|
||||||
return null;
|
return null;
|
||||||
}};
|
}};
|
||||||
|
|
||||||
Class<?> returnType = methodSignature.getReturnType();
|
return doSubmit(task, executor, methodSignature.getReturnType());
|
||||||
if (ListenableFuture.class.isAssignableFrom(returnType)) {
|
|
||||||
return ((AsyncListenableTaskExecutor) executor).submitListenable(callable);
|
|
||||||
}
|
|
||||||
else if (Future.class.isAssignableFrom(returnType)) {
|
|
||||||
return executor.submit(callable);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
executor.submit(callable);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue