SPR-7369: @Async support in spring-aspects with AspectJ
This commit is contained in:
parent
e1fb19f4e1
commit
00984781af
|
|
@ -0,0 +1,58 @@
|
||||||
|
package org.springframework.scheduling;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.aspectj.lang.reflect.MethodSignature;
|
||||||
|
import org.springframework.core.task.AsyncTaskExecutor;
|
||||||
|
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||||
|
import org.springframework.core.task.support.TaskExecutorAdapter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract aspect that routes selected methods asynchronously.
|
||||||
|
* <p>
|
||||||
|
* This aspect, by default, uses {@link SimpleAsyncTaskExecutor} to route method
|
||||||
|
* execution. However, you may inject it with any implementation of
|
||||||
|
* {@link Executor} to override the default.
|
||||||
|
*
|
||||||
|
* @author Ramnivas Laddad
|
||||||
|
*/
|
||||||
|
public abstract aspect AbstractAsynchronousExecutionAspect {
|
||||||
|
private AsyncTaskExecutor asyncExecutor;
|
||||||
|
|
||||||
|
public AbstractAsynchronousExecutionAspect() {
|
||||||
|
// Set default executor, which may be replaced by calling setExecutor(Executor)
|
||||||
|
setExecutor(new SimpleAsyncTaskExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract pointcut asyncMethod();
|
||||||
|
|
||||||
|
Object around() : asyncMethod() {
|
||||||
|
Callable<Object> callable = new Callable<Object>() {
|
||||||
|
public Object call() throws Exception {
|
||||||
|
Object result = proceed();
|
||||||
|
if (result instanceof Future) {
|
||||||
|
return ((Future<?>) result).get();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}};
|
||||||
|
|
||||||
|
Future<?> result = asyncExecutor.submit(callable);
|
||||||
|
|
||||||
|
if (Future.class.isAssignableFrom(((MethodSignature)thisJoinPointStaticPart.getSignature()).getReturnType())) {
|
||||||
|
return result;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExecutor(Executor executor) {
|
||||||
|
if (executor instanceof AsyncTaskExecutor) {
|
||||||
|
this.asyncExecutor = (AsyncTaskExecutor) executor;
|
||||||
|
} else {
|
||||||
|
this.asyncExecutor = new TaskExecutorAdapter(asyncExecutor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
package org.springframework.scheduling;
|
||||||
|
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aspect to route methods based on the {@link Async} annotation.
|
||||||
|
* <p>
|
||||||
|
* This aspect routes methods marked with the {@link Async} annotation
|
||||||
|
* as well as methods in classes marked with the same. Any method expected
|
||||||
|
* to be routed asynchronously must return either void, {@link Future},
|
||||||
|
* or a subtype of {@link Future}. This aspect, therefore, will produce
|
||||||
|
* a compile-time error for methods that violate this constraint on the return type.
|
||||||
|
* If, however, a class marked with <code>@Async</code> contains a method that
|
||||||
|
* violates this constraint, it produces only a warning.
|
||||||
|
*
|
||||||
|
* @author Ramnivas Laddad
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public aspect AnnotationDrivenAsynchronousExecutionAspect extends AbstractAsynchronousExecutionAspect {
|
||||||
|
private pointcut asyncMarkedMethod()
|
||||||
|
: execution(@Async (void || Future+) *(..));
|
||||||
|
private pointcut asyncTypeMarkedMethod()
|
||||||
|
: execution((void || Future+) (@Async *).*(..));
|
||||||
|
|
||||||
|
public pointcut asyncMethod() : asyncMarkedMethod() || asyncTypeMarkedMethod();
|
||||||
|
|
||||||
|
declare error:
|
||||||
|
execution(@Async !(void||Future) *(..)):
|
||||||
|
"Only method that return void or Future may have @Async annotation";
|
||||||
|
|
||||||
|
declare warning:
|
||||||
|
execution(!(void||Future) (@Async *).*(..)):
|
||||||
|
"Method in class marked with @Async that do not return void or Future will be routed synchronously";
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,161 @@
|
||||||
|
package org.springframework.scheduling;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import static junit.framework.Assert.*;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.scheduling.annotation.AsyncResult;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for AnnotationDrivenAsynchronousExecutionAspect
|
||||||
|
*
|
||||||
|
* @author Ramnivas Laddad
|
||||||
|
*/
|
||||||
|
public class AnnotationDrivenAsynchronousExecutionAspectTest {
|
||||||
|
private static final long WAIT_TIME = 1000; //milli seconds
|
||||||
|
private CountingExecutor executor;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
executor = new CountingExecutor();
|
||||||
|
AnnotationDrivenAsynchronousExecutionAspect.aspectOf().setExecutor(executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void asyncMethodGetsRoutedAsynchronously() {
|
||||||
|
ClassWithoutAsyncAnnotation obj = new ClassWithoutAsyncAnnotation();
|
||||||
|
obj.incrementAsync();
|
||||||
|
executor.waitForCompletion();
|
||||||
|
assertEquals(1, obj.counter);
|
||||||
|
assertEquals(1, executor.submitStartCounter);
|
||||||
|
assertEquals(1, executor.submitCompleteCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void asyncMethodReturningFutureGetsRoutedAsynchronouslyAndReturnsAFuture() throws InterruptedException, ExecutionException {
|
||||||
|
ClassWithoutAsyncAnnotation obj = new ClassWithoutAsyncAnnotation();
|
||||||
|
Future<Integer> future = obj.incrementReturningAFuture();
|
||||||
|
// No need to executor.waitForCompletion() as future.get() will have the same effect
|
||||||
|
assertEquals(5, future.get().intValue());
|
||||||
|
assertEquals(1, obj.counter);
|
||||||
|
assertEquals(1, executor.submitStartCounter);
|
||||||
|
assertEquals(1, executor.submitCompleteCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void syncMethodGetsRoutedSynchronously() {
|
||||||
|
ClassWithoutAsyncAnnotation obj = new ClassWithoutAsyncAnnotation();
|
||||||
|
obj.increment();
|
||||||
|
assertEquals(1, obj.counter);
|
||||||
|
assertEquals(0, executor.submitStartCounter);
|
||||||
|
assertEquals(0, executor.submitCompleteCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void voidMethodInAsyncClassGetsRoutedAsynchronously() {
|
||||||
|
ClassWithAsyncAnnotation obj = new ClassWithAsyncAnnotation();
|
||||||
|
obj.increment();
|
||||||
|
executor.waitForCompletion();
|
||||||
|
assertEquals(1, obj.counter);
|
||||||
|
assertEquals(1, executor.submitStartCounter);
|
||||||
|
assertEquals(1, executor.submitCompleteCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void methodReturningFutureInAsyncClassGetsRoutedAsynchronouslyAndReturnsAFuture() throws InterruptedException, ExecutionException {
|
||||||
|
ClassWithAsyncAnnotation obj = new ClassWithAsyncAnnotation();
|
||||||
|
Future<Integer> future = obj.incrementReturningAFuture();
|
||||||
|
assertEquals(5, future.get().intValue());
|
||||||
|
assertEquals(1, obj.counter);
|
||||||
|
assertEquals(1, executor.submitStartCounter);
|
||||||
|
assertEquals(1, executor.submitCompleteCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void methodReturningNonVoidNonFutureInAsyncClassGetsRoutedSynchronously() {
|
||||||
|
ClassWithAsyncAnnotation obj = new ClassWithAsyncAnnotation();
|
||||||
|
int returnValue = obj.return5();
|
||||||
|
assertEquals(5, returnValue);
|
||||||
|
assertEquals(0, executor.submitStartCounter);
|
||||||
|
assertEquals(0, executor.submitCompleteCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
private static class CountingExecutor extends SimpleAsyncTaskExecutor {
|
||||||
|
int submitStartCounter;
|
||||||
|
int submitCompleteCounter;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<T> submit(Callable<T> task) {
|
||||||
|
submitStartCounter++;
|
||||||
|
Future<T> future = super.submit(task);
|
||||||
|
submitCompleteCounter++;
|
||||||
|
synchronized (this) {
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void waitForCompletion() {
|
||||||
|
try {
|
||||||
|
wait(WAIT_TIME);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Assert.fail("Didn't finish the async job in " + WAIT_TIME + " milliseconds");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ClassWithoutAsyncAnnotation {
|
||||||
|
int counter;
|
||||||
|
|
||||||
|
@Async public void incrementAsync() {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void increment() {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Async public Future<Integer> incrementReturningAFuture() {
|
||||||
|
counter++;
|
||||||
|
return new AsyncResult<Integer>(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
// It should be an error to attach @Async to a method that returns a non-void
|
||||||
|
// or non-Future.
|
||||||
|
// We need to keep this commented out, otherwise there will be a compile-time error.
|
||||||
|
// Please uncomment and re-comment this periodically to check that the compiler
|
||||||
|
// produces an error message due to the 'declare error' statement
|
||||||
|
// in AnnotationDrivenAsynchronousExecutionAspect
|
||||||
|
// @Async public int getInt() {
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
@Async
|
||||||
|
static class ClassWithAsyncAnnotation {
|
||||||
|
int counter;
|
||||||
|
|
||||||
|
public void increment() {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manually check that there is a warning from the 'declare warning' statement in AnnotationDrivenAsynchronousExecutionAspect
|
||||||
|
public int return5() {
|
||||||
|
return 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Future<Integer> incrementReturningAFuture() {
|
||||||
|
counter++;
|
||||||
|
return new AsyncResult<Integer>(5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue