AsyncExecutionInterceptor uses submitListenable if method signature indicates ListenableFuture

Issue: SPR-11909
This commit is contained in:
Juergen Hoeller 2014-06-26 15:19:17 +02:00
parent 50b21d061f
commit 46dc07a005
4 changed files with 60 additions and 23 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.util.Assert;
@ -40,6 +41,7 @@ import org.springframework.util.StringUtils;
* bean to be used when executing it, e.g. through an annotation attribute.
*
* @author Chris Beams
* @author Juergen Hoeller
* @since 3.1.2
*/
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
@ -87,6 +89,7 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor has been set)
*/
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
@ -103,8 +106,8 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
else if (executorToUse == null) {
return null;
}
executor = (executorToUse instanceof AsyncTaskExecutor ?
(AsyncTaskExecutor) executorToUse : new TaskExecutorAdapter(executorToUse));
executor = (executorToUse instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) executorToUse : new TaskExecutorAdapter(executorToUse));
this.executors.put(method, executor);
}
return executor;

View File

@ -29,9 +29,11 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.Ordered;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* AOP Alliance {@code MethodInterceptor} that processes method invocations
@ -120,27 +122,31 @@ public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Future<?> result = executor.submit(
new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
Callable<Object> task = new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
});
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
}
};
if (Future.class.isAssignableFrom(invocation.getMethod().getReturnType())) {
return result;
Class<?> returnType = invocation.getMethod().getReturnType();
if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,18 +16,24 @@
package org.springframework.scheduling.annotation;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* A pass-through {@code Future} handle that can be used for method signatures
* which are declared with a Future return type for asynchronous execution.
* which are declared with a {@code Future} return type for asynchronous execution.
*
* <p>As of Spring 4.1, this class implements {@link ListenableFuture}, not just
* plain {@link java.util.concurrent.Future}, along with the corresponding support
* in {@code @Async} processing.
*
* @author Juergen Hoeller
* @since 3.0
* @see Async
*/
public class AsyncResult<V> implements Future<V> {
public class AsyncResult<V> implements ListenableFuture<V> {
private final V value;
@ -40,6 +46,7 @@ public class AsyncResult<V> implements Future<V> {
this.value = value;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
@ -65,4 +72,9 @@ public class AsyncResult<V> implements Future<V> {
return this.value;
}
@Override
public void addCallback(ListenableFutureCallback<? super V> callback) {
callback.onSuccess(this.value);
}
}

View File

@ -36,6 +36,7 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import static org.junit.Assert.*;
@ -65,6 +66,8 @@ public class AsyncExecutionTests {
asyncTest.doSomething(10);
Future<String> future = asyncTest.returnSomething(20);
assertEquals("20", future.get());
ListenableFuture<String> listenableFuture = asyncTest.returnSomethingListenable(20);
assertEquals("20", listenableFuture.get());
}
@Test
@ -134,6 +137,8 @@ public class AsyncExecutionTests {
asyncTest.doSomething(10);
Future<String> future = asyncTest.returnSomething(20);
assertEquals("20", future.get());
ListenableFuture<String> listenableFuture = asyncTest.returnSomethingListenable(20);
assertEquals("20", listenableFuture.get());
}
@Test
@ -348,6 +353,12 @@ public class AsyncExecutionTests {
assertTrue(!Thread.currentThread().getName().equals(originalThreadName));
return new AsyncResult<String>(Integer.toString(i));
}
@Async
public ListenableFuture<String> returnSomethingListenable(int i) {
assertTrue(!Thread.currentThread().getName().equals(originalThreadName));
return new AsyncResult<String>(Integer.toString(i));
}
}
@ -410,6 +421,11 @@ public class AsyncExecutionTests {
return new AsyncResult<String>(Integer.toString(i));
}
public ListenableFuture<String> returnSomethingListenable(int i) {
assertTrue(!Thread.currentThread().getName().equals(originalThreadName));
return new AsyncResult<String>(Integer.toString(i));
}
@Override
public void destroy() {
}