Make DeferredResult more usable and testable

DeferredResult now has a setErrorResult method that can be set to an
Exception or an error object, error view, etc.

The new isSetOrExpired() method can be used to check pro-actively if
the DeferredResult is still usable or not.

The setDeferredResultHandler method is now public so tests may use it.

Issue: SPR-9690, SPR-9689
This commit is contained in:
Rossen Stoyanchev 2012-08-16 00:12:23 -04:00
parent 3b9833c538
commit 4407f6a4c0
9 changed files with 190 additions and 218 deletions

View File

@ -52,9 +52,9 @@ public interface AsyncWebRequest extends NativeWebRequest {
void startAsync();
/**
* Whether the request is in asynchronous mode after a call to {@link #startAsync()}.
* Returns "false" if asynchronous processing never started, has completed, or the
* request was dispatched for further processing.
* Whether the request is in async mode following a call to {@link #startAsync()}.
* Returns "false" if asynchronous processing never started, has completed,
* or the request was dispatched for further processing.
*/
boolean isAsyncStarted();
@ -65,13 +65,13 @@ public interface AsyncWebRequest extends NativeWebRequest {
void dispatch();
/**
* Whether the request was dispatched to the container.
* Whether the request was dispatched to the container in order to resume
* processing after concurrent execution in an application thread.
*/
boolean isDispatched();
/**
* Whether asynchronous processing has completed in which case the request
* response should no longer be used.
* Whether asynchronous processing has completed.
*/
boolean isAsyncComplete();

View File

@ -13,136 +13,121 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.context.request.async;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.util.Assert;
/**
* DeferredResult provides an alternative to using a Callable for async request
* processing. With a Callable the framework manages a thread on behalf of the
* application through an {@link AsyncTaskExecutor}. With a DeferredResult the
* application sets the result in a thread of its choice.
*
* <p>The following sequence describes the intended use scenario:
* <ol>
* <li>thread-1: framework calls application method
* <li>thread-1: application method returns a DeferredResult
* <li>thread-1: framework initializes DeferredResult
* <li>thread-2: application calls {@link #set(Object)}
* <li>thread-2: framework completes async processing with given result
* </ol>
*
* <p>If the application calls {@link #set(Object)} in thread-2 before the
* DeferredResult is initialized by the framework in thread-1, then thread-2
* will block and wait for the initialization to complete. Therefore an
* application should never create and set the DeferredResult in the same
* thread because the initialization will never complete.</p>
* {@code DeferredResult} provides an alternative to returning a {@link Callable}
* for asynchronous request processing. While with a Callable, a thread is used
* to execute it on behalf of the application, with a DeferredResult the application
* sets the result whenever it needs to from a thread of its choice.
*
* @author Rossen Stoyanchev
* @since 3.2
*/
public final class DeferredResult<V> {
public final class DeferredResult<T> {
private static final Log logger = LogFactory.getLog(DeferredResult.class);
private V result;
private static final Object RESULT_NONE = new Object();
private Object result = RESULT_NONE;
private final Object timeoutResult;
private final AtomicBoolean expired = new AtomicBoolean(false);
private DeferredResultHandler resultHandler;
private final V timeoutValue;
private final Object lock = new Object();
private final boolean timeoutValueSet;
private final CountDownLatch latch = new CountDownLatch(1);
private boolean timeoutValueUsed;
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private final ReentrantLock setLock = new ReentrantLock();
/**
* Create a new instance.
* Create a DeferredResult instance.
*/
public DeferredResult() {
this.timeoutValue = null;
this.timeoutValueSet = false;
this(RESULT_NONE);
}
/**
* Create a new instance also providing a default value to set if a timeout
* occurs before {@link #set(Object)} is called.
* Create a DeferredResult with a default result to use in case of a timeout.
* @param timeoutResult the result to use
*/
public DeferredResult(V timeoutValue) {
this.timeoutValue = timeoutValue;
this.timeoutValueSet = true;
public DeferredResult(Object timeoutResult) {
this.timeoutResult = timeoutResult;
}
/**
* Complete async processing with the given value. If the DeferredResult is
* not fully initialized yet, this method will block and wait for that to
* occur before proceeding. See the class level javadoc for more details.
*
* @throws StaleAsyncWebRequestException if the underlying async request
* has already timed out or ended due to a network error.
* Set a handler to handle the result when set. Normally applications do not
* use this method at runtime but may do so during testing.
*/
public void set(V value) throws StaleAsyncWebRequestException {
if (this.setLock.tryLock() && (!this.timeoutValueUsed)) {
public void setResultHandler(DeferredResultHandler resultHandler) {
this.resultHandler = resultHandler;
this.latch.countDown();
}
/**
* Set the result value and pass it on for handling.
* @param result the result value
* @return "true" if the result was set and passed on for handling;
* "false" if the result was already set or the async request expired.
* @see #isSetOrExpired()
*/
public boolean setResult(T result) {
return processResult(result);
}
/**
* Set an error result value and pass it on for handling. If the result is an
* {@link Exception} or {@link Throwable}, it will be processed as though the
* controller raised the exception. Otherwise it will be processed as if the
* controller returned the given result.
* @param result the error result value
* @return "true" if the result was set to the error value and passed on for handling;
* "false" if the result was already set or the async request expired.
* @see #isSetOrExpired()
*/
public boolean setErrorResult(Object result) {
return processResult(result);
}
private boolean processResult(Object result) {
synchronized (this.lock) {
if (isSetOrExpired()) {
return false;
}
this.result = result;
if (!awaitResultHandler()) {
throw new IllegalStateException("DeferredResultHandler not set");
}
try {
handle(value);
this.resultHandler.handleResult(result);
}
finally {
this.setLock.unlock();
catch (Throwable t) {
logger.trace("DeferredResult not handled", t);
return false;
}
}
else {
// A timeout is in progress or has already occurred
throw new StaleAsyncWebRequestException("Async request timed out");
}
}
/**
* An alternative to {@link #set(Object)} that absorbs a potential
* {@link StaleAsyncWebRequestException}.
* @return {@code false} if the outcome was a {@code StaleAsyncWebRequestException}
*/
public boolean trySet(V result) throws StaleAsyncWebRequestException {
try {
set(result);
return true;
}
catch (StaleAsyncWebRequestException ex) {
// absorb
}
return false;
}
private void handle(V result) throws StaleAsyncWebRequestException {
Assert.isNull(this.result, "A deferred result can be set once only");
this.result = result;
this.timeoutValueUsed = (this.timeoutValueSet && (this.result == this.timeoutValue));
if (!await()) {
throw new IllegalStateException(
"Gave up on waiting for DeferredResult to be initialized. " +
"Are you perhaps creating and setting a DeferredResult in the same thread? " +
"The DeferredResult must be fully initialized before you can set it. " +
"See the class javadoc for more details");
}
if (this.timeoutValueUsed) {
logger.debug("Using default timeout value");
}
this.resultHandler.handle(result);
}
private boolean await() {
private boolean awaitResultHandler() {
try {
return this.initializationLatch.await(10, TimeUnit.SECONDS);
return this.latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
return false;
@ -150,43 +135,32 @@ public final class DeferredResult<V> {
}
/**
* Return a handler to use to complete processing using the default timeout value
* provided via {@link #DeferredResult(Object)} or {@code null} if no timeout
* value was provided.
* Whether the DeferredResult can no longer be set either because the async
* request expired or because it was already set.
*/
Runnable getTimeoutHandler() {
if (!this.timeoutValueSet) {
return null;
}
return new Runnable() {
public void run() { useTimeoutValue(); }
};
public boolean isSetOrExpired() {
return (this.expired.get() || (this.result != RESULT_NONE));
}
private void useTimeoutValue() {
this.setLock.lock();
try {
if (this.result == null) {
handle(this.timeoutValue);
this.timeoutValueUsed = true;
}
} finally {
this.setLock.unlock();
}
void setExpired() {
this.expired.set(true);
}
void init(DeferredResultHandler handler) {
this.resultHandler = handler;
this.initializationLatch.countDown();
boolean hasTimeoutResult() {
return this.timeoutResult != RESULT_NONE;
}
boolean applyTimeoutResult() {
return hasTimeoutResult() ? processResult(this.timeoutResult) : false;
}
/**
* Completes processing when {@link DeferredResult#set(Object)} is called.
* Handles a DeferredResult value when set.
*/
interface DeferredResultHandler {
public interface DeferredResultHandler {
void handle(Object result) throws StaleAsyncWebRequestException;
void handleResult(Object result);
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.context.request.async;
/**
* Raised if a stale AsyncWebRequest is detected during async request processing.
*
* @author Rossen Stoyanchev
* @since 3.2
*/
@SuppressWarnings("serial")
public class StaleAsyncWebRequestException extends RuntimeException {
public StaleAsyncWebRequestException(String message) {
super(message);
}
}

View File

@ -55,6 +55,12 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
private final List<Runnable> completionHandlers = new ArrayList<Runnable>();
/**
* Create a new instance for the given request/response pair.
* @param request current HTTP request
* @param response current HTTP response
*/
public StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
super(request, response);
}
@ -94,7 +100,6 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
return this.asyncCompleted.get();
}
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +

View File

@ -269,24 +269,35 @@ public final class WebAsyncManager {
startAsyncProcessing(processingContext);
deferredResult.init(new DeferredResultHandler() {
this.asyncWebRequest.addCompletionHandler(new Runnable() {
public void run() {
deferredResult.setExpired();
}
});
public void handle(Object result) {
if (deferredResult.hasTimeoutResult()) {
this.asyncWebRequest.setTimeoutHandler(new Runnable() {
public void run() {
deferredResult.applyTimeoutResult();
}
});
}
deferredResult.setResultHandler(new DeferredResultHandler() {
public void handleResult(Object result) {
concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Deferred result value [" + concurrentResult + "]");
}
if (asyncWebRequest.isAsyncComplete()) {
throw new StaleAsyncWebRequestException("Could not complete processing due to a timeout or network error");
}
Assert.state(!asyncWebRequest.isAsyncComplete(),
"Cannot handle DeferredResult [ " + deferredResult + " ] due to a timeout or network error");
logger.debug("Dispatching request to complete processing");
asyncWebRequest.dispatch();
}
});
this.asyncWebRequest.setTimeoutHandler(deferredResult.getTimeoutHandler());
}
private void startAsyncProcessing(Object... context) {

View File

@ -17,16 +17,13 @@
package org.springframework.web.context.request.async;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.StaleAsyncWebRequestException;
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
/**
@ -37,79 +34,96 @@ import org.springframework.web.context.request.async.DeferredResult.DeferredResu
public class DeferredResultTests {
@Test
public void set() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
DeferredResult<String> deferredResult = new DeferredResult<String>();
deferredResult.init(resultHandler);
public void setResult() {
DeferredResultHandler handler = createMock(DeferredResultHandler.class);
handler.handleResult("hello");
replay(handler);
resultHandler.handle("foo");
replay(resultHandler);
DeferredResult<String> result = new DeferredResult<String>();
result.setResultHandler(handler);
deferredResult.set("foo");
assertTrue(result.setResult("hello"));
verify(resultHandler);
verify(handler);
}
@Test
public void getTimeoutHandler() {
assertNull(new DeferredResult<String>().getTimeoutHandler());
assertNotNull(new DeferredResult<String>("foo").getTimeoutHandler());
public void setResultTwice() {
DeferredResultHandler handler = createMock(DeferredResultHandler.class);
handler.handleResult("hello");
replay(handler);
DeferredResult<String> result = new DeferredResult<String>();
result.setResultHandler(handler);
assertTrue(result.setResult("hello"));
assertFalse(result.setResult("hi"));
verify(handler);
}
@Test
public void handleTimeout() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
resultHandler.handle("foo");
replay(resultHandler);
public void setResultWithException() {
DeferredResultHandler handler = createMock(DeferredResultHandler.class);
handler.handleResult("hello");
expectLastCall().andThrow(new IllegalStateException());
replay(handler);
DeferredResult<String> deferredResult = new DeferredResult<String>("foo");
deferredResult.init(resultHandler);
DeferredResult<String> result = new DeferredResult<String>();
result.setResultHandler(handler);
deferredResult.getTimeoutHandler().run();
assertFalse(result.setResult("hello"));
verify(resultHandler);
verify(handler);
}
@Test
public void setAfterTimeoutValueUsed() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
resultHandler.handle("foo");
replay(resultHandler);
public void isSetOrExpired() {
DeferredResultHandler handler = createMock(DeferredResultHandler.class);
handler.handleResult("hello");
replay(handler);
DeferredResult<String> deferredResult = new DeferredResult<String>("foo");
deferredResult.init(resultHandler);
DeferredResult<String> result = new DeferredResult<String>();
result.setResultHandler(handler);
deferredResult.getTimeoutHandler().run();
assertFalse(result.isSetOrExpired());
verify(resultHandler);
result.setResult("hello");
try {
deferredResult.set("foo");
fail("Expected exception");
}
catch (StaleAsyncWebRequestException ex) {
// expected
}
assertTrue(result.isSetOrExpired());
verify(handler);
}
@Test
public void setBeforeTimeoutValueUsed() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
resultHandler.handle("foo");
replay(resultHandler);
public void setExpired() {
DeferredResult<String> result = new DeferredResult<String>();
assertFalse(result.isSetOrExpired());
DeferredResult<String> deferredResult = new DeferredResult<String>("foo");
deferredResult.init(resultHandler);
deferredResult.set("foo");
result.setExpired();
assertTrue(result.isSetOrExpired());
assertFalse(result.setResult("hello"));
}
verify(resultHandler);
@Test
public void hasTimeout() {
assertFalse(new DeferredResult<String>().hasTimeoutResult());
assertTrue(new DeferredResult<String>("timed out").hasTimeoutResult());
}
reset(resultHandler);
replay(resultHandler);
@Test
public void applyTimeoutResult() {
DeferredResultHandler handler = createMock(DeferredResultHandler.class);
handler.handleResult("timed out");
replay(handler);
deferredResult.getTimeoutHandler().run();
DeferredResult<String> result = new DeferredResult<String>("timed out");
result.setResultHandler(handler);
verify(resultHandler);
assertTrue(result.applyTimeoutResult());
assertFalse("Shouldn't be able to set result after timeout", result.setResult("hello"));
verify(handler);
}
}

View File

@ -141,17 +141,17 @@ public class WebAsyncManagerTests {
assertTrue(this.asyncManager.isConcurrentHandlingStarted());
deferredResult.set(25);
deferredResult.setResult(25);
assertEquals(25, this.asyncManager.getConcurrentResult());
}
@Test(expected=StaleAsyncWebRequestException.class)
public void startDeferredResultProcessing_staleRequest() throws Exception {
@Test
public void startDeferredResultProcessingStaleRequest() throws Exception {
DeferredResult<Integer> deferredResult = new DeferredResult<Integer>();
this.asyncManager.startDeferredResultProcessing(deferredResult);
this.stubAsyncWebRequest.setAsyncComplete(true);
deferredResult.set(1);
assertFalse(deferredResult.setResult(1));
}
@Test

View File

@ -705,7 +705,7 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter i
if (logger.isDebugEnabled()) {
logger.debug("Found concurrent result value [" + result + "]");
}
requestMappingMethod = requestMappingMethod.wrapConcurrentProcessingResult(result);
requestMappingMethod = requestMappingMethod.wrapConcurrentResult(result);
}
requestMappingMethod.invokeAndHandle(webRequest, mavContainer);

View File

@ -167,7 +167,7 @@ public class ServletInvocableHandlerMethod extends InvocableHandlerMethod {
* from an async operation essentially either applying return value handling or
* raising an exception if the end result is an Exception.
*/
ServletInvocableHandlerMethod wrapConcurrentProcessingResult(final Object result) {
ServletInvocableHandlerMethod wrapConcurrentResult(final Object result) {
return new CallableHandlerMethod(new Callable<Object>() {