diff --git a/spring-orm/src/main/java/org/springframework/orm/hibernate5/support/AsyncRequestInterceptor.java b/spring-orm/src/main/java/org/springframework/orm/hibernate5/support/AsyncRequestInterceptor.java index dfc31c1b29..d3ea3a8df9 100644 --- a/spring-orm/src/main/java/org/springframework/orm/hibernate5/support/AsyncRequestInterceptor.java +++ b/spring-orm/src/main/java/org/springframework/orm/hibernate5/support/AsyncRequestInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,7 @@ import org.springframework.web.context.request.async.DeferredResultProcessingInt * * Ensures the following: * 1) The session is bound/unbound when "callable processing" is started - * 2) The session is closed if an async request times out + * 2) The session is closed if an async request times out or an error occurred * * @author Rossen Stoyanchev * @since 4.2 @@ -51,6 +51,8 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple private volatile boolean timeoutInProgress; + private volatile boolean errorInProgress; + public AsyncRequestInterceptor(SessionFactory sessionFactory, SessionHolder sessionHolder) { this.sessionFactory = sessionFactory; @@ -65,6 +67,7 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple public void bindSession() { this.timeoutInProgress = false; + this.errorInProgress = false; TransactionSynchronizationManager.bindResource(this.sessionFactory, this.sessionHolder); } @@ -80,13 +83,19 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple } @Override - public void afterCompletion(NativeWebRequest request, Callable task) throws Exception { - closeAfterTimeout(); + public Object handleError(NativeWebRequest request, Callable task, Throwable t) { + this.errorInProgress = true; + return RESULT_NONE; // give other interceptors a chance to handle the error } - private void closeAfterTimeout() { - if (this.timeoutInProgress) { - logger.debug("Closing Hibernate Session after async request timeout"); + @Override + public void afterCompletion(NativeWebRequest request, Callable task) throws Exception { + closeSession(); + } + + private void closeSession() { + if (this.timeoutInProgress || this.errorInProgress) { + logger.debug("Closing Hibernate Session after async request timeout/error"); SessionFactoryUtils.closeSession(this.sessionHolder.getSession()); } } @@ -112,9 +121,15 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple return true; // give other interceptors a chance to handle the timeout } + @Override + public boolean handleError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) { + this.errorInProgress = true; + return true; // give other interceptors a chance to handle the error + } + @Override public void afterCompletion(NativeWebRequest request, DeferredResult deferredResult) { - closeAfterTimeout(); + closeSession(); } } diff --git a/spring-orm/src/main/java/org/springframework/orm/jpa/support/AsyncRequestInterceptor.java b/spring-orm/src/main/java/org/springframework/orm/jpa/support/AsyncRequestInterceptor.java index 7ff7e1620e..d2d3b02b25 100644 --- a/spring-orm/src/main/java/org/springframework/orm/jpa/support/AsyncRequestInterceptor.java +++ b/spring-orm/src/main/java/org/springframework/orm/jpa/support/AsyncRequestInterceptor.java @@ -36,7 +36,7 @@ import org.springframework.web.context.request.async.DeferredResultProcessingInt * * Ensures the following: * 1) The session is bound/unbound when "callable processing" is started - * 2) The session is closed if an async request times out + * 2) The session is closed if an async request times out or an error occurred * * @author Rossen Stoyanchev * @since 3.2.5 @@ -51,6 +51,8 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple private volatile boolean timeoutInProgress; + private volatile boolean errorInProgress; + public AsyncRequestInterceptor(EntityManagerFactory emFactory, EntityManagerHolder emHolder) { this.emFactory = emFactory; @@ -65,6 +67,7 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple public void bindEntityManager() { this.timeoutInProgress = false; + this.errorInProgress = false; TransactionSynchronizationManager.bindResource(this.emFactory, this.emHolder); } @@ -80,13 +83,19 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple } @Override - public void afterCompletion(NativeWebRequest request, Callable task) throws Exception { - closeAfterTimeout(); + public Object handleError(NativeWebRequest request, Callable task, Throwable t) { + this.errorInProgress = true; + return RESULT_NONE; // give other interceptors a chance to handle the error } - private void closeAfterTimeout() { - if (this.timeoutInProgress) { - logger.debug("Closing JPA EntityManager after async request timeout"); + @Override + public void afterCompletion(NativeWebRequest request, Callable task) throws Exception { + closeEntityManager(); + } + + private void closeEntityManager() { + if (this.timeoutInProgress || this.errorInProgress) { + logger.debug("Closing JPA EntityManager after async request timeout/error"); EntityManagerFactoryUtils.closeEntityManager(emHolder.getEntityManager()); } } @@ -112,9 +121,15 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple return true; // give other interceptors a chance to handle the timeout } + @Override + public boolean handleError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) { + this.errorInProgress = true; + return true; // give other interceptors a chance to handle the error + } + @Override public void afterCompletion(NativeWebRequest request, DeferredResult deferredResult) { - closeAfterTimeout(); + closeEntityManager(); } } diff --git a/spring-orm/src/test/java/org/springframework/orm/jpa/support/OpenEntityManagerInViewTests.java b/spring-orm/src/test/java/org/springframework/orm/jpa/support/OpenEntityManagerInViewTests.java index be2e4b2e72..8f351c1873 100644 --- a/spring-orm/src/test/java/org/springframework/orm/jpa/support/OpenEntityManagerInViewTests.java +++ b/spring-orm/src/test/java/org/springframework/orm/jpa/support/OpenEntityManagerInViewTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -228,6 +228,48 @@ public class OpenEntityManagerInViewTests { verify(this.manager).close(); } + @Test + public void testOpenEntityManagerInViewInterceptorAsyncErrorScenario() throws Exception { + + // Initial request thread + + OpenEntityManagerInViewInterceptor interceptor = new OpenEntityManagerInViewInterceptor(); + interceptor.setEntityManagerFactory(factory); + + given(this.factory.createEntityManager()).willReturn(this.manager); + + interceptor.preHandle(this.webRequest); + assertTrue(TransactionSynchronizationManager.hasResource(this.factory)); + + AsyncWebRequest asyncWebRequest = new StandardServletAsyncWebRequest(this.request, this.response); + WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.request); + asyncManager.setTaskExecutor(new SyncTaskExecutor()); + asyncManager.setAsyncWebRequest(asyncWebRequest); + asyncManager.startCallableProcessing(new Callable() { + @Override + public String call() throws Exception { + return "anything"; + } + }); + + interceptor.afterConcurrentHandlingStarted(this.webRequest); + assertFalse(TransactionSynchronizationManager.hasResource(this.factory)); + + // Async request timeout + + given(this.manager.isOpen()).willReturn(true); + + MockAsyncContext asyncContext = (MockAsyncContext) this.request.getAsyncContext(); + for (AsyncListener listener : asyncContext.getListeners()) { + listener.onError(new AsyncEvent(asyncContext, new Exception())); + } + for (AsyncListener listener : asyncContext.getListeners()) { + listener.onComplete(new AsyncEvent(asyncContext)); + } + + verify(this.manager).close(); + } + @Test public void testOpenEntityManagerInViewFilter() throws Exception { given(manager.isOpen()).willReturn(true); diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java index 64c8072ccc..1e89d1a2fc 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.web.context.request.async; +import java.util.function.Consumer; + import org.springframework.lang.Nullable; import org.springframework.web.context.request.NativeWebRequest; @@ -42,7 +44,13 @@ public interface AsyncWebRequest extends NativeWebRequest { void addTimeoutHandler(Runnable runnable); /** - * Add a handle to invoke when request processing completes. + * Add a handler to invoke when an error occurred while concurrent + * handling of a request. + */ + void addErrorHandler(Consumer exceptionHandler); + + /** + * Add a handler to invoke when request processing completes. */ void addCompletionHandler(Runnable runnable); diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java index b0984fbee6..3c81ee1a27 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -94,6 +94,24 @@ class CallableInterceptorChain { return CallableProcessingInterceptor.RESULT_NONE; } + public Object triggerAfterError(NativeWebRequest request, Callable task, Throwable throwable) { + for (CallableProcessingInterceptor interceptor : this.interceptors) { + try { + Object result = interceptor.handleError(request, task, throwable); + if (result == CallableProcessingInterceptor.RESPONSE_HANDLED) { + break; + } + else if (result != CallableProcessingInterceptor.RESULT_NONE) { + return result; + } + } + catch (Throwable t) { + return t; + } + } + return CallableProcessingInterceptor.RESULT_NONE; + } + public void triggerAfterCompletion(NativeWebRequest request, Callable task) { for (int i = this.interceptors.size()-1; i >= 0; i--) { try { diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java index e180931685..8b6fba962d 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ import org.springframework.web.context.request.NativeWebRequest; * *

A {@code CallableProcessingInterceptor} is invoked before and after the * invocation of the {@code Callable} task in the asynchronous thread, as well - * as on timeout from a container thread, or after completing for any reason + * as on timeout/error from a container thread, or after completing for any reason * including a timeout or network error. * *

As a general rule exceptions raised by interceptor methods will cause @@ -36,7 +36,7 @@ import org.springframework.web.context.request.NativeWebRequest; * the Exception instance as the concurrent result. Such exceptions will then * be processed through the {@code HandlerExceptionResolver} mechanism. * - *

The {@link #handleTimeout(NativeWebRequest, Callable) afterTimeout} method + *

The {@link #handleTimeout(NativeWebRequest, Callable) handleTimeout} method * can select a value to be used to resume processing. * * @author Rossen Stoyanchev @@ -101,6 +101,21 @@ public interface CallableProcessingInterceptor { */ Object handleTimeout(NativeWebRequest request, Callable task) throws Exception; + /** + * Invoked from a container thread when an error occurred while processing the async request + * before the {@code Callable} task completes. Implementations may return a value, + * including an {@link Exception}, to use instead of the value the + * {@link Callable} did not return in time. + * @param request the current request + * @param task the task for the current async request + * @paramt t the error that occurred while request processing + * @return a concurrent result value; if the value is anything other than + * {@link #RESULT_NONE} or {@link #RESPONSE_HANDLED}, concurrent processing + * is resumed and subsequent interceptors are not invoked + * @throws Exception in case of errors + */ + Object handleError(NativeWebRequest request, Callable task, Throwable t) throws Exception; + /** * Invoked from a container thread when async processing completes for any * reason including timeout or network error. diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java index 10451a0668..13d6b9413f 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,6 +60,15 @@ public abstract class CallableProcessingInterceptorAdapter implements CallablePr return RESULT_NONE; } + /** + * This implementation always returns + * {@link CallableProcessingInterceptor#RESULT_NONE RESULT_NONE}. + */ + @Override + public Object handleError(NativeWebRequest request, Callable task, Throwable t) throws Exception { + return RESULT_NONE; + } + /** * This implementation is empty. */ diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java index 5d65839b35..8667871d34 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.web.context.request.async; import java.util.PriorityQueue; import java.util.concurrent.Callable; +import java.util.function.Consumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,6 +62,8 @@ public class DeferredResult { private Runnable timeoutCallback; + private Consumer errorCallback; + private Runnable completionCallback; private DeferredResultHandler resultHandler; @@ -150,6 +153,17 @@ public class DeferredResult { this.timeoutCallback = callback; } + /** + * Register code to invoke when an error occurred while processing the async request. + *

This method is called from a container thread when an error occurred while + * processing an async request before the {@code DeferredResult} has been populated. + * It may invoke {@link DeferredResult#setResult setResult} or + * {@link DeferredResult#setErrorResult setErrorResult} to resume processing. + */ + public void onError(Consumer callback) { + this.errorCallback = callback; + } + /** * Register code to invoke when the async request completes. *

This method is called from a container thread when an async request @@ -275,6 +289,25 @@ public class DeferredResult { return continueProcessing; } @Override + public boolean handleError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) { + boolean continueProcessing = true; + try { + if (errorCallback != null) { + errorCallback.accept(t); + } + } + finally { + continueProcessing = false; + try { + setResultInternal(t); + } + catch (Throwable ex) { + logger.debug("Failed to handle error result", ex); + } + } + return continueProcessing; + } + @Override public void afterCompletion(NativeWebRequest request, DeferredResult deferredResult) { expired = true; if (completionCallback != null) { diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java index 5bbc6dd9b3..58399f87e6 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,17 @@ class DeferredResultInterceptorChain { } } + public void triggerAfterError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) throws Exception { + for (DeferredResultProcessingInterceptor interceptor : this.interceptors) { + if (deferredResult.isSetOrExpired()) { + return; + } + if (!interceptor.handleError(request, deferredResult, t)){ + break; + } + } + } + public void triggerAfterCompletion(NativeWebRequest request, DeferredResult deferredResult) { for (int i = this.preProcessingIndex; i >= 0; i--) { try { diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java index 89e5745f85..4dba163b41 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import org.springframework.web.context.request.NativeWebRequest; * *

A {@code DeferredResultProcessingInterceptor} is invoked before the start * of async processing, after the {@code DeferredResult} is set as well as on - * timeout, or after completing for any reason including a timeout or network + * timeout/error, or after completing for any reason including a timeout or network * error. * *

As a general rule exceptions raised by interceptor methods will cause @@ -33,7 +33,7 @@ import org.springframework.web.context.request.NativeWebRequest; * the Exception instance as the concurrent result. Such exceptions will then * be processed through the {@code HandlerExceptionResolver} mechanism. * - *

The {@link #handleTimeout(NativeWebRequest, DeferredResult) afterTimeout} + *

The {@link #handleTimeout(NativeWebRequest, DeferredResult) handleTimeout} * method can set the {@code DeferredResult} in order to resume processing. * * @author Rossen Stoyanchev @@ -94,6 +94,22 @@ public interface DeferredResultProcessingInterceptor { */ boolean handleTimeout(NativeWebRequest request, DeferredResult deferredResult) throws Exception; + /** + * Invoked from a container thread when an error occurred while processing an async request + * before the {@code DeferredResult} has been set. Implementations may invoke + * {@link DeferredResult#setResult(Object) setResult} or + * {@link DeferredResult#setErrorResult(Object) setErrorResult} to resume processing. + * @param request the current request + * @param deferredResult the DeferredResult for the current request; if the + * {@code DeferredResult} is set, then concurrent processing is resumed and + * subsequent interceptors are not invoked + * @param t the error that occurred while request processing + * @return {@code true} if processing should continue, or {@code false} if + * other interceptors should not be invoked + * @throws Exception in case of errors + */ + boolean handleError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) throws Exception; + /** * Invoked from a container thread when an async request completed for any * reason including timeout and network error. This method is useful for diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java index dee7d210ba..1ed5116582 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,6 +60,16 @@ public abstract class DeferredResultProcessingInterceptorAdapter implements Defe return true; } + /** + * This implementation returns {@code true} by default allowing other interceptors + * to be given a chance to handle the error. + */ + @Override + public boolean handleError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) + throws Exception { + return true; + } + /** * This implementation is empty. */ diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/ErrorCallableProcessingInterceptor.java b/spring-web/src/main/java/org/springframework/web/context/request/async/ErrorCallableProcessingInterceptor.java new file mode 100644 index 0000000000..2f7b7f80ca --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/ErrorCallableProcessingInterceptor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.context.request.async; + +import java.util.concurrent.Callable; + +import org.springframework.web.context.request.NativeWebRequest; + +/** + * Registered at the end, after all other interceptors and + * therefore invoked only if no other interceptor handles the error. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class ErrorCallableProcessingInterceptor extends CallableProcessingInterceptorAdapter { + + @Override + public Object handleError(NativeWebRequest request, Callable task, Throwable t) throws Exception { + return t; + } + +} diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/ErrorDeferredResultProcessingInterceptor.java b/spring-web/src/main/java/org/springframework/web/context/request/async/ErrorDeferredResultProcessingInterceptor.java new file mode 100644 index 0000000000..7413493e12 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/ErrorDeferredResultProcessingInterceptor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.context.request.async; + +import org.springframework.web.context.request.NativeWebRequest; + +/** + * Registered at the end, after all other interceptors and + * therefore invoked only if no other interceptor handles the error. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class ErrorDeferredResultProcessingInterceptor extends DeferredResultProcessingInterceptorAdapter { + + @Override + public boolean handleError(NativeWebRequest request, DeferredResult result, Throwable t) + throws Exception { + result.setErrorResult(t); + return false; + } + +} diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java b/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java index 841ebd53cc..dadb0fea31 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; @@ -50,6 +52,8 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements private final List timeoutHandlers = new ArrayList<>(); + private final List> exceptionHandlers = new ArrayList<>(); + private final List completionHandlers = new ArrayList<>(); @@ -78,6 +82,11 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements this.timeoutHandlers.add(timeoutHandler); } + @Override + public void addErrorHandler(Consumer exceptionHandler) { + this.exceptionHandlers.add(exceptionHandler); + } + @Override public void addCompletionHandler(Runnable runnable) { this.completionHandlers.add(runnable); @@ -134,7 +143,9 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements @Override public void onError(AsyncEvent event) throws IOException { - onComplete(event); + for (Consumer handler : this.exceptionHandlers) { + handler.accept(event.getThrowable()); + } } @Override diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java index 4cbb6f515d..420307809a 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java @@ -67,9 +67,15 @@ public final class WebAsyncManager { private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor(); + private static final CallableProcessingInterceptor errorCallableInterceptor = + new ErrorCallableProcessingInterceptor(); + private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor(); + private static final DeferredResultProcessingInterceptor errorDeferredResultInterceptor = + new ErrorDeferredResultProcessingInterceptor(); + private AsyncWebRequest asyncWebRequest; @@ -281,6 +287,7 @@ public final class WebAsyncManager { interceptors.add(webAsyncTask.getInterceptor()); interceptors.addAll(this.callableInterceptors.values()); interceptors.add(timeoutCallableInterceptor); + interceptors.add(errorCallableInterceptor); final Callable callable = webAsyncTask.getCallable(); final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors); @@ -293,6 +300,14 @@ public final class WebAsyncManager { } }); + this.asyncWebRequest.addErrorHandler(t -> { + logger.debug("Processing error"); + Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, t); + if (result != CallableProcessingInterceptor.RESULT_NONE) { + setConcurrentResultAndDispatch(result); + } + }); + this.asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable)); @@ -371,6 +386,7 @@ public final class WebAsyncManager { interceptors.add(deferredResult.getInterceptor()); interceptors.addAll(this.deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); + interceptors.add(errorDeferredResultInterceptor); final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); @@ -383,6 +399,15 @@ public final class WebAsyncManager { } }); + this.asyncWebRequest.addErrorHandler(t -> { + try { + interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, t); + } + catch (Throwable ex) { + setConcurrentResultAndDispatch(ex); + } + }); + this.asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult)); diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncTask.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncTask.java index 002c1d602d..9ef0b89b1e 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncTask.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,8 @@ public class WebAsyncTask implements BeanFactoryAware { private Callable timeoutCallback; + private Callable errorCallback; + private Runnable completionCallback; @@ -150,6 +152,18 @@ public class WebAsyncTask implements BeanFactoryAware { this.timeoutCallback = callback; } + /** + * Register code to invoke when an error occurred while processing the async request. + *

This method is called from a container thread when an error occurred while processing + * an async request before the {@code Callable} has completed. The callback is executed in + * the same thread and therefore should return without blocking. It may return + * an alternative value to use, including an {@link Exception} or return + * {@link CallableProcessingInterceptor#RESULT_NONE RESULT_NONE}. + */ + public void onError(Callable callback) { + this.errorCallback = callback; + } + /** * Register code to invoke when the async request completes. *

This method is called from a container thread when an async request @@ -166,6 +180,10 @@ public class WebAsyncTask implements BeanFactoryAware { return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE); } @Override + public Object handleError(NativeWebRequest request, Callable task, Throwable t) throws Exception { + return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE); + } + @Override public void afterCompletion(NativeWebRequest request, Callable task) throws Exception { if (completionCallback != null) { completionCallback.run(); diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java index 62c81cf18a..2b1843efee 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.web.context.request.async; +import java.util.function.Consumer; + import org.junit.Test; import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler; @@ -125,4 +127,27 @@ public class DeferredResultTests { verify(handler).handleResult("timeout result"); } + @Test + public void onError() throws Exception { + final StringBuilder sb = new StringBuilder(); + + DeferredResultHandler handler = mock(DeferredResultHandler.class); + + DeferredResult result = new DeferredResult<>(null, "error result"); + result.setResultHandler(handler); + Exception e = new Exception(); + result.onError(new Consumer() { + @Override + public void accept(Throwable t) { + sb.append("error event"); + } + }); + + result.getInterceptor().handleError(null, null, e); + + assertEquals("error event", sb.toString()); + assertFalse("Should not be able to set result a second time", result.setResult("hello")); + verify(handler).handleResult(e); + } + } diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequestTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequestTests.java index 72a07aceb9..c96588c289 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequestTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequestTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ package org.springframework.web.context.request.async; +import java.util.function.Consumer; + import javax.servlet.AsyncEvent; import org.junit.Before; @@ -128,6 +130,16 @@ public class StandardServletAsyncWebRequestTests { verify(timeoutHandler).run(); } + @SuppressWarnings("unchecked") + @Test + public void onErrorHandler() throws Exception { + Consumer errorHandler = mock(Consumer.class); + this.asyncRequest.addErrorHandler(errorHandler); + Exception e = new Exception(); + this.asyncRequest.onError(new AsyncEvent(new MockAsyncContext(this.request, this.response), e)); + verify(errorHandler).accept(e); + } + @Test(expected = IllegalStateException.class) public void setTimeoutDuringConcurrentHandling() { this.asyncRequest.startAsync(); @@ -148,13 +160,26 @@ public class StandardServletAsyncWebRequestTests { // SPR-13292 + @SuppressWarnings("unchecked") @Test - public void onCompletionHandlerAfterOnErrorEvent() throws Exception { + public void onErrorHandlerAfterOnErrorEvent() throws Exception { + Consumer handler = mock(Consumer.class); + this.asyncRequest.addErrorHandler(handler); + + this.asyncRequest.startAsync(); + Exception e = new Exception(); + this.asyncRequest.onError(new AsyncEvent(this.request.getAsyncContext(), e)); + + verify(handler).accept(e); + } + + @Test + public void onCompletionHandlerAfterOnCompleteEvent() throws Exception { Runnable handler = mock(Runnable.class); this.asyncRequest.addCompletionHandler(handler); this.asyncRequest.startAsync(); - this.asyncRequest.onError(new AsyncEvent(this.request.getAsyncContext())); + this.asyncRequest.onComplete(new AsyncEvent(this.request.getAsyncContext())); verify(handler).run(); assertTrue(this.asyncRequest.isAsyncComplete()); diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerErrorTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerErrorTests.java new file mode 100644 index 0000000000..81a84d0978 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerErrorTests.java @@ -0,0 +1,280 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.context.request.async; + +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +import javax.servlet.AsyncEvent; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.mock.web.test.MockAsyncContext; +import org.springframework.mock.web.test.MockHttpServletRequest; +import org.springframework.mock.web.test.MockHttpServletResponse; +import org.springframework.web.context.request.NativeWebRequest; + +import static org.junit.Assert.*; +import static org.mockito.BDDMockito.*; +import static org.springframework.web.context.request.async.CallableProcessingInterceptor.*; + +/** + * {@link WebAsyncManager} tests where container-triggered error/completion + * events are simulated. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class WebAsyncManagerErrorTests { + + private WebAsyncManager asyncManager; + + private StandardServletAsyncWebRequest asyncWebRequest; + + private MockHttpServletRequest servletRequest; + + private MockHttpServletResponse servletResponse; + + + @Before + public void setup() { + this.servletRequest = new MockHttpServletRequest("GET", "/test"); + this.servletRequest.setAsyncSupported(true); + this.servletResponse = new MockHttpServletResponse(); + this.asyncWebRequest = new StandardServletAsyncWebRequest(servletRequest, servletResponse); + + AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class); + + this.asyncManager = WebAsyncUtils.getAsyncManager(servletRequest); + this.asyncManager.setTaskExecutor(executor); + this.asyncManager.setAsyncWebRequest(this.asyncWebRequest); + } + + + @Test + public void startCallableProcessingErrorAndComplete() throws Exception { + StubCallable callable = new StubCallable(); + + CallableProcessingInterceptor interceptor = mock(CallableProcessingInterceptor.class); + Exception e = new Exception(); + given(interceptor.handleError(this.asyncWebRequest, callable, e)).willReturn(RESULT_NONE); + + this.asyncManager.registerCallableInterceptor("interceptor", interceptor); + this.asyncManager.startCallableProcessing(callable); + + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + this.asyncWebRequest.onComplete(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(e, this.asyncManager.getConcurrentResult()); + + verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable); + verify(interceptor).afterCompletion(this.asyncWebRequest, callable); + } + + @Test + public void startCallableProcessingErrorAndResumeThroughCallback() throws Exception { + + StubCallable callable = new StubCallable(); + WebAsyncTask webAsyncTask = new WebAsyncTask<>(callable); + webAsyncTask.onError(new Callable() { + @Override + public Object call() throws Exception { + return 7; + } + }); + + this.asyncManager.startCallableProcessing(webAsyncTask); + + Exception e = new Exception(); + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(7, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + } + + @Test + public void startCallableProcessingErrorAndResumeThroughInterceptor() throws Exception { + + StubCallable callable = new StubCallable(); + + CallableProcessingInterceptor interceptor = mock(CallableProcessingInterceptor.class); + Exception e = new Exception(); + given(interceptor.handleError(this.asyncWebRequest, callable, e)).willReturn(22); + + this.asyncManager.registerCallableInterceptor("errorInterceptor", interceptor); + this.asyncManager.startCallableProcessing(callable); + + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(22, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + + verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable); + } + + @Test + public void startCallableProcessingAfterException() throws Exception { + + StubCallable callable = new StubCallable(); + Exception exception = new Exception(); + + CallableProcessingInterceptor interceptor = mock(CallableProcessingInterceptor.class); + Exception e = new Exception(); + given(interceptor.handleError(this.asyncWebRequest, callable, e)).willThrow(exception); + + this.asyncManager.registerCallableInterceptor("errorInterceptor", interceptor); + this.asyncManager.startCallableProcessing(callable); + + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(exception, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + + verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable); + } + + @Test + public void startDeferredResultProcessingErrorAndComplete() throws Exception { + + DeferredResult deferredResult = new DeferredResult<>(); + + DeferredResultProcessingInterceptor interceptor = mock(DeferredResultProcessingInterceptor.class); + Exception e = new Exception(); + given(interceptor.handleError(this.asyncWebRequest, deferredResult, e)).willReturn(true); + + this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor); + this.asyncManager.startDeferredResultProcessing(deferredResult); + + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + this.asyncWebRequest.onComplete(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(e, this.asyncManager.getConcurrentResult()); + + verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, deferredResult); + verify(interceptor).preProcess(this.asyncWebRequest, deferredResult); + verify(interceptor).afterCompletion(this.asyncWebRequest, deferredResult); + } + + @Test + public void startDeferredResultProcessingErrorAndResumeWithDefaultResult() throws Exception { + + Exception e = new Exception(); + DeferredResult deferredResult = new DeferredResult<>(null, e); + this.asyncManager.startDeferredResultProcessing(deferredResult); + + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(e, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + } + + @Test + public void startDeferredResultProcessingErrorAndResumeThroughCallback() throws Exception { + + final DeferredResult deferredResult = new DeferredResult<>(); + deferredResult.onError(new Consumer() { + @Override + public void accept(Throwable t) { + deferredResult.setResult(t); + } + }); + + this.asyncManager.startDeferredResultProcessing(deferredResult); + + Exception e = new Exception(); + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(e, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + } + + @Test + public void startDeferredResultProcessingErrorAndResumeThroughInterceptor() throws Exception { + + DeferredResult deferredResult = new DeferredResult<>(); + + DeferredResultProcessingInterceptor interceptor = new DeferredResultProcessingInterceptorAdapter() { + @Override + public boolean handleError(NativeWebRequest request, DeferredResult result, Throwable t) + throws Exception { + result.setErrorResult(t); + return true; + } + }; + + this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor); + this.asyncManager.startDeferredResultProcessing(deferredResult); + + Exception e = new Exception(); + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(e, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + } + + @Test + public void startDeferredResultProcessingAfterException() throws Exception { + + DeferredResult deferredResult = new DeferredResult<>(); + final Exception exception = new Exception(); + + DeferredResultProcessingInterceptor interceptor = new DeferredResultProcessingInterceptorAdapter() { + @Override + public boolean handleError(NativeWebRequest request, DeferredResult result, Throwable t) + throws Exception { + throw exception; + } + }; + + this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor); + this.asyncManager.startDeferredResultProcessing(deferredResult); + + Exception e = new Exception(); + AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e); + this.asyncWebRequest.onError(event); + + assertTrue(this.asyncManager.hasConcurrentResult()); + assertEquals(e, this.asyncManager.getConcurrentResult()); + assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()); + } + + + private final class StubCallable implements Callable { + @Override + public Object call() throws Exception { + return 21; + } + } + +} diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java index 0d805d7871..b02fb64615 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java @@ -17,6 +17,8 @@ package org.springframework.web.context.request.async; import java.util.concurrent.Callable; +import java.util.function.Consumer; + import javax.servlet.http.HttpServletRequest; import org.junit.Before; @@ -138,6 +140,7 @@ public class WebAsyncManagerTests { verify(interceptor).postProcess(this.asyncWebRequest, task, concurrentResult); } + @SuppressWarnings("unchecked") @Test public void startCallableProcessingBeforeConcurrentHandlingException() throws Exception { Callable task = new StubCallable(21); @@ -159,6 +162,7 @@ public class WebAsyncManagerTests { assertFalse(this.asyncManager.hasConcurrentResult()); verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull()); + verify(this.asyncWebRequest).addErrorHandler((Consumer) notNull()); verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull()); } @@ -228,18 +232,19 @@ public class WebAsyncManagerTests { verify(interceptor2).preProcess(this.asyncWebRequest, task); } + @SuppressWarnings("unchecked") @Test public void startCallableProcessingWithAsyncTask() throws Exception { AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class); given(this.asyncWebRequest.getNativeRequest(HttpServletRequest.class)).willReturn(this.servletRequest); - @SuppressWarnings("unchecked") WebAsyncTask asyncTask = new WebAsyncTask<>(1000L, executor, mock(Callable.class)); this.asyncManager.startCallableProcessing(asyncTask); verify(executor).submit((Runnable) notNull()); verify(this.asyncWebRequest).setTimeout(1000L); verify(this.asyncWebRequest).addTimeoutHandler(any(Runnable.class)); + verify(this.asyncWebRequest).addErrorHandler(any(Consumer.class)); verify(this.asyncWebRequest).addCompletionHandler(any(Runnable.class)); verify(this.asyncWebRequest).startAsync(); } @@ -277,6 +282,7 @@ public class WebAsyncManagerTests { verify(this.asyncWebRequest).setTimeout(1000L); } + @SuppressWarnings("unchecked") @Test public void startDeferredResultProcessingBeforeConcurrentHandlingException() throws Exception { DeferredResult deferredResult = new DeferredResult<>(); @@ -298,6 +304,7 @@ public class WebAsyncManagerTests { assertFalse(this.asyncManager.hasConcurrentResult()); verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull()); + verify(this.asyncWebRequest).addErrorHandler((Consumer) notNull()); verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull()); } @@ -359,8 +366,10 @@ public class WebAsyncManagerTests { given(this.asyncWebRequest.isAsyncComplete()).willReturn(false); } + @SuppressWarnings("unchecked") private void verifyDefaultAsyncScenario() { verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull()); + verify(this.asyncWebRequest).addErrorHandler((Consumer) notNull()); verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull()); verify(this.asyncWebRequest).startAsync(); verify(this.asyncWebRequest).dispatch(); diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java index 588558c819..250fab7298 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java @@ -219,6 +219,7 @@ class ReactiveTypeHandler { terminate(); this.emitter.complete(); }); + this.emitter.onError(t -> this.emitter.completeWithError(t)); subscription.request(1); } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index 260d6d14d2..6a8cc2dbaa 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -19,6 +19,7 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.io.IOException; import java.util.LinkedHashSet; import java.util.Set; +import java.util.function.Consumer; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; @@ -74,6 +75,8 @@ public class ResponseBodyEmitter { private final DefaultCallback timeoutCallback = new DefaultCallback(); + private final ErrorCallback errorCallback = new ErrorCallback(); + private final DefaultCallback completionCallback = new DefaultCallback(); @@ -123,6 +126,7 @@ public class ResponseBodyEmitter { } else { this.handler.onTimeout(this.timeoutCallback); + this.handler.onError(this.errorCallback); this.handler.onCompletion(this.completionCallback); } } @@ -168,11 +172,9 @@ public class ResponseBodyEmitter { this.handler.send(object, mediaType); } catch (IOException ex) { - completeWithError(ex); throw ex; } catch (Throwable ex) { - completeWithError(ex); throw new IllegalStateException("Failed to send " + object, ex); } } @@ -214,6 +216,15 @@ public class ResponseBodyEmitter { this.timeoutCallback.setDelegate(callback); } + /** + * Register code to invoke when an error occurred while processing the async request. + * This method is called from a container thread when an error occurred while processing + * an async request. + */ + public synchronized void onError(Consumer callback) { + this.errorCallback.setDelegate(callback); + } + /** * Register code to invoke when the async request completes. This method is * called from a container thread when an async request completed for any @@ -244,6 +255,8 @@ public class ResponseBodyEmitter { void onTimeout(Runnable callback); + void onError(Consumer callback); + void onCompletion(Runnable callback); } @@ -291,4 +304,22 @@ public class ResponseBodyEmitter { } } + + private class ErrorCallback implements Consumer { + + private Consumer delegate; + + public void setDelegate(Consumer callback) { + this.delegate = callback; + } + + @Override + public void accept(Throwable t) { + ResponseBodyEmitter.this.complete = true; + if (this.delegate != null) { + this.delegate.accept(t); + } + } + } + } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java index 9653170e5e..162795340a 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java @@ -19,6 +19,8 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.io.IOException; import java.io.OutputStream; import java.util.List; +import java.util.function.Consumer; + import javax.servlet.ServletRequest; import javax.servlet.http.HttpServletResponse; @@ -217,6 +219,11 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur this.deferredResult.onTimeout(callback); } + @Override + public void onError(Consumer callback) { + this.deferredResult.onError(callback); + } + @Override public void onCompletion(Runnable callback) { this.deferredResult.onCompletion(callback); diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index 82b9feec7a..ad368fc241 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.junit.Before; @@ -380,6 +381,10 @@ public class ReactiveTypeHandlerTests { public void onTimeout(Runnable callback) { } + @Override + public void onError(Consumer callback) { + } + @Override public void onCompletion(Runnable callback) { } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java index 5e29fe1042..7903239575 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.junit.Before; import org.junit.Test; @@ -175,6 +176,25 @@ public class ResponseBodyEmitterReturnValueHandlerTests { verify(asyncWebRequest).startAsync(); } + @SuppressWarnings("unchecked") + @Test + public void responseBodyEmitterWithErrorValue() throws Exception { + + AsyncWebRequest asyncWebRequest = mock(AsyncWebRequest.class); + WebAsyncUtils.getAsyncManager(this.request).setAsyncWebRequest(asyncWebRequest); + + ResponseBodyEmitter emitter = new ResponseBodyEmitter(19000L); + emitter.onError(mock(Consumer.class)); + emitter.onCompletion(mock(Runnable.class)); + + MethodParameter type = on(TestController.class).resolveReturnType(ResponseBodyEmitter.class); + this.handler.handleReturnValue(emitter, type, this.mavContainer, this.webRequest); + + verify(asyncWebRequest).addErrorHandler(any(Consumer.class)); + verify(asyncWebRequest, times(2)).addCompletionHandler(any(Runnable.class)); + verify(asyncWebRequest).startAsync(); + } + @Test public void sseEmitter() throws Exception { MethodParameter type = on(TestController.class).resolveReturnType(SseEmitter.class); diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java index 3709a5e8ed..bd6a65c6b3 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java @@ -102,6 +102,7 @@ public class ResponseBodyEmitterTests { public void sendAfterHandlerInitialized() throws Exception { this.emitter.initialize(this.handler); verify(this.handler).onTimeout(any()); + verify(this.handler).onError(any()); verify(this.handler).onCompletion(any()); verifyNoMoreInteractions(this.handler); @@ -119,6 +120,7 @@ public class ResponseBodyEmitterTests { public void sendAfterHandlerInitializedWithError() throws Exception { this.emitter.initialize(this.handler); verify(this.handler).onTimeout(any()); + verify(this.handler).onError(any()); verify(this.handler).onCompletion(any()); verifyNoMoreInteractions(this.handler); @@ -137,6 +139,7 @@ public class ResponseBodyEmitterTests { public void sendWithError() throws Exception { this.emitter.initialize(this.handler); verify(this.handler).onTimeout(any()); + verify(this.handler).onError(any()); verify(this.handler).onCompletion(any()); verifyNoMoreInteractions(this.handler); @@ -150,7 +153,6 @@ public class ResponseBodyEmitterTests { // expected } verify(this.handler).send("foo", MediaType.TEXT_PLAIN); - verify(this.handler).completeWithError(failure); verifyNoMoreInteractions(this.handler); } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java index e1a7cc1c10..d5cae7818c 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.junit.Before; import org.junit.Test; @@ -152,6 +153,10 @@ public class SseEmitterTests { public void onTimeout(Runnable callback) { } + @Override + public void onError(Consumer callback) { + } + @Override public void onCompletion(Runnable callback) { }