Add onError callback to DeferredResult

Issue: SPR-15614
This commit is contained in:
Violeta Georgieva 2017-06-23 16:48:40 +03:00 committed by Rossen Stoyanchev
parent 140542e8b1
commit e0678ba583
27 changed files with 771 additions and 41 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,7 +36,7 @@ import org.springframework.web.context.request.async.DeferredResultProcessingInt
*
* Ensures the following:
* 1) The session is bound/unbound when "callable processing" is started
* 2) The session is closed if an async request times out
* 2) The session is closed if an async request times out or an error occurred
*
* @author Rossen Stoyanchev
* @since 4.2
@ -51,6 +51,8 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
private volatile boolean timeoutInProgress;
private volatile boolean errorInProgress;
public AsyncRequestInterceptor(SessionFactory sessionFactory, SessionHolder sessionHolder) {
this.sessionFactory = sessionFactory;
@ -65,6 +67,7 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
public void bindSession() {
this.timeoutInProgress = false;
this.errorInProgress = false;
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.sessionHolder);
}
@ -80,13 +83,19 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
}
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
closeAfterTimeout();
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) {
this.errorInProgress = true;
return RESULT_NONE; // give other interceptors a chance to handle the error
}
private void closeAfterTimeout() {
if (this.timeoutInProgress) {
logger.debug("Closing Hibernate Session after async request timeout");
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
closeSession();
}
private void closeSession() {
if (this.timeoutInProgress || this.errorInProgress) {
logger.debug("Closing Hibernate Session after async request timeout/error");
SessionFactoryUtils.closeSession(this.sessionHolder.getSession());
}
}
@ -112,9 +121,15 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
return true; // give other interceptors a chance to handle the timeout
}
@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> deferredResult, Throwable t) {
this.errorInProgress = true;
return true; // give other interceptors a chance to handle the error
}
@Override
public <T> void afterCompletion(NativeWebRequest request, DeferredResult<T> deferredResult) {
closeAfterTimeout();
closeSession();
}
}

View File

@ -36,7 +36,7 @@ import org.springframework.web.context.request.async.DeferredResultProcessingInt
*
* Ensures the following:
* 1) The session is bound/unbound when "callable processing" is started
* 2) The session is closed if an async request times out
* 2) The session is closed if an async request times out or an error occurred
*
* @author Rossen Stoyanchev
* @since 3.2.5
@ -51,6 +51,8 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
private volatile boolean timeoutInProgress;
private volatile boolean errorInProgress;
public AsyncRequestInterceptor(EntityManagerFactory emFactory, EntityManagerHolder emHolder) {
this.emFactory = emFactory;
@ -65,6 +67,7 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
public void bindEntityManager() {
this.timeoutInProgress = false;
this.errorInProgress = false;
TransactionSynchronizationManager.bindResource(this.emFactory, this.emHolder);
}
@ -80,13 +83,19 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
}
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
closeAfterTimeout();
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) {
this.errorInProgress = true;
return RESULT_NONE; // give other interceptors a chance to handle the error
}
private void closeAfterTimeout() {
if (this.timeoutInProgress) {
logger.debug("Closing JPA EntityManager after async request timeout");
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
closeEntityManager();
}
private void closeEntityManager() {
if (this.timeoutInProgress || this.errorInProgress) {
logger.debug("Closing JPA EntityManager after async request timeout/error");
EntityManagerFactoryUtils.closeEntityManager(emHolder.getEntityManager());
}
}
@ -112,9 +121,15 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple
return true; // give other interceptors a chance to handle the timeout
}
@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> deferredResult, Throwable t) {
this.errorInProgress = true;
return true; // give other interceptors a chance to handle the error
}
@Override
public <T> void afterCompletion(NativeWebRequest request, DeferredResult<T> deferredResult) {
closeAfterTimeout();
closeEntityManager();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -228,6 +228,48 @@ public class OpenEntityManagerInViewTests {
verify(this.manager).close();
}
@Test
public void testOpenEntityManagerInViewInterceptorAsyncErrorScenario() throws Exception {
// Initial request thread
OpenEntityManagerInViewInterceptor interceptor = new OpenEntityManagerInViewInterceptor();
interceptor.setEntityManagerFactory(factory);
given(this.factory.createEntityManager()).willReturn(this.manager);
interceptor.preHandle(this.webRequest);
assertTrue(TransactionSynchronizationManager.hasResource(this.factory));
AsyncWebRequest asyncWebRequest = new StandardServletAsyncWebRequest(this.request, this.response);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.request);
asyncManager.setTaskExecutor(new SyncTaskExecutor());
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.startCallableProcessing(new Callable<String>() {
@Override
public String call() throws Exception {
return "anything";
}
});
interceptor.afterConcurrentHandlingStarted(this.webRequest);
assertFalse(TransactionSynchronizationManager.hasResource(this.factory));
// Async request timeout
given(this.manager.isOpen()).willReturn(true);
MockAsyncContext asyncContext = (MockAsyncContext) this.request.getAsyncContext();
for (AsyncListener listener : asyncContext.getListeners()) {
listener.onError(new AsyncEvent(asyncContext, new Exception()));
}
for (AsyncListener listener : asyncContext.getListeners()) {
listener.onComplete(new AsyncEvent(asyncContext));
}
verify(this.manager).close();
}
@Test
public void testOpenEntityManagerInViewFilter() throws Exception {
given(manager.isOpen()).willReturn(true);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.web.context.request.async;
import java.util.function.Consumer;
import org.springframework.lang.Nullable;
import org.springframework.web.context.request.NativeWebRequest;
@ -42,7 +44,13 @@ public interface AsyncWebRequest extends NativeWebRequest {
void addTimeoutHandler(Runnable runnable);
/**
* Add a handle to invoke when request processing completes.
* Add a handler to invoke when an error occurred while concurrent
* handling of a request.
*/
void addErrorHandler(Consumer<Throwable> exceptionHandler);
/**
* Add a handler to invoke when request processing completes.
*/
void addCompletionHandler(Runnable runnable);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -94,6 +94,24 @@ class CallableInterceptorChain {
return CallableProcessingInterceptor.RESULT_NONE;
}
public Object triggerAfterError(NativeWebRequest request, Callable<?> task, Throwable throwable) {
for (CallableProcessingInterceptor interceptor : this.interceptors) {
try {
Object result = interceptor.handleError(request, task, throwable);
if (result == CallableProcessingInterceptor.RESPONSE_HANDLED) {
break;
}
else if (result != CallableProcessingInterceptor.RESULT_NONE) {
return result;
}
}
catch (Throwable t) {
return t;
}
}
return CallableProcessingInterceptor.RESULT_NONE;
}
public void triggerAfterCompletion(NativeWebRequest request, Callable<?> task) {
for (int i = this.interceptors.size()-1; i >= 0; i--) {
try {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,7 +28,7 @@ import org.springframework.web.context.request.NativeWebRequest;
*
* <p>A {@code CallableProcessingInterceptor} is invoked before and after the
* invocation of the {@code Callable} task in the asynchronous thread, as well
* as on timeout from a container thread, or after completing for any reason
* as on timeout/error from a container thread, or after completing for any reason
* including a timeout or network error.
*
* <p>As a general rule exceptions raised by interceptor methods will cause
@ -36,7 +36,7 @@ import org.springframework.web.context.request.NativeWebRequest;
* the Exception instance as the concurrent result. Such exceptions will then
* be processed through the {@code HandlerExceptionResolver} mechanism.
*
* <p>The {@link #handleTimeout(NativeWebRequest, Callable) afterTimeout} method
* <p>The {@link #handleTimeout(NativeWebRequest, Callable) handleTimeout} method
* can select a value to be used to resume processing.
*
* @author Rossen Stoyanchev
@ -101,6 +101,21 @@ public interface CallableProcessingInterceptor {
*/
<T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception;
/**
* Invoked from a container thread when an error occurred while processing the async request
* before the {@code Callable} task completes. Implementations may return a value,
* including an {@link Exception}, to use instead of the value the
* {@link Callable} did not return in time.
* @param request the current request
* @param task the task for the current async request
* @paramt t the error that occurred while request processing
* @return a concurrent result value; if the value is anything other than
* {@link #RESULT_NONE} or {@link #RESPONSE_HANDLED}, concurrent processing
* is resumed and subsequent interceptors are not invoked
* @throws Exception in case of errors
*/
<T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception;
/**
* Invoked from a container thread when async processing completes for any
* reason including timeout or network error.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -60,6 +60,15 @@ public abstract class CallableProcessingInterceptorAdapter implements CallablePr
return RESULT_NONE;
}
/**
* This implementation always returns
* {@link CallableProcessingInterceptor#RESULT_NONE RESULT_NONE}.
*/
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return RESULT_NONE;
}
/**
* This implementation is empty.
*/

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.web.context.request.async;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -61,6 +62,8 @@ public class DeferredResult<T> {
private Runnable timeoutCallback;
private Consumer<Throwable> errorCallback;
private Runnable completionCallback;
private DeferredResultHandler resultHandler;
@ -150,6 +153,17 @@ public class DeferredResult<T> {
this.timeoutCallback = callback;
}
/**
* Register code to invoke when an error occurred while processing the async request.
* <p>This method is called from a container thread when an error occurred while
* processing an async request before the {@code DeferredResult} has been populated.
* It may invoke {@link DeferredResult#setResult setResult} or
* {@link DeferredResult#setErrorResult setErrorResult} to resume processing.
*/
public void onError(Consumer<Throwable> callback) {
this.errorCallback = callback;
}
/**
* Register code to invoke when the async request completes.
* <p>This method is called from a container thread when an async request
@ -275,6 +289,25 @@ public class DeferredResult<T> {
return continueProcessing;
}
@Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) {
boolean continueProcessing = true;
try {
if (errorCallback != null) {
errorCallback.accept(t);
}
}
finally {
continueProcessing = false;
try {
setResultInternal(t);
}
catch (Throwable ex) {
logger.debug("Failed to handle error result", ex);
}
}
return continueProcessing;
}
@Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) {
expired = true;
if (completionCallback != null) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -78,6 +78,17 @@ class DeferredResultInterceptorChain {
}
}
public void triggerAfterError(NativeWebRequest request, DeferredResult<?> deferredResult, Throwable t) throws Exception {
for (DeferredResultProcessingInterceptor interceptor : this.interceptors) {
if (deferredResult.isSetOrExpired()) {
return;
}
if (!interceptor.handleError(request, deferredResult, t)){
break;
}
}
}
public void triggerAfterCompletion(NativeWebRequest request, DeferredResult<?> deferredResult) {
for (int i = this.preProcessingIndex; i >= 0; i--) {
try {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,7 +25,7 @@ import org.springframework.web.context.request.NativeWebRequest;
*
* <p>A {@code DeferredResultProcessingInterceptor} is invoked before the start
* of async processing, after the {@code DeferredResult} is set as well as on
* timeout, or after completing for any reason including a timeout or network
* timeout/error, or after completing for any reason including a timeout or network
* error.
*
* <p>As a general rule exceptions raised by interceptor methods will cause
@ -33,7 +33,7 @@ import org.springframework.web.context.request.NativeWebRequest;
* the Exception instance as the concurrent result. Such exceptions will then
* be processed through the {@code HandlerExceptionResolver} mechanism.
*
* <p>The {@link #handleTimeout(NativeWebRequest, DeferredResult) afterTimeout}
* <p>The {@link #handleTimeout(NativeWebRequest, DeferredResult) handleTimeout}
* method can set the {@code DeferredResult} in order to resume processing.
*
* @author Rossen Stoyanchev
@ -94,6 +94,22 @@ public interface DeferredResultProcessingInterceptor {
*/
<T> boolean handleTimeout(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;
/**
* Invoked from a container thread when an error occurred while processing an async request
* before the {@code DeferredResult} has been set. Implementations may invoke
* {@link DeferredResult#setResult(Object) setResult} or
* {@link DeferredResult#setErrorResult(Object) setErrorResult} to resume processing.
* @param request the current request
* @param deferredResult the DeferredResult for the current request; if the
* {@code DeferredResult} is set, then concurrent processing is resumed and
* subsequent interceptors are not invoked
* @param t the error that occurred while request processing
* @return {@code true} if processing should continue, or {@code false} if
* other interceptors should not be invoked
* @throws Exception in case of errors
*/
<T> boolean handleError(NativeWebRequest request, DeferredResult<T> deferredResult, Throwable t) throws Exception;
/**
* Invoked from a container thread when an async request completed for any
* reason including timeout and network error. This method is useful for

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -60,6 +60,16 @@ public abstract class DeferredResultProcessingInterceptorAdapter implements Defe
return true;
}
/**
* This implementation returns {@code true} by default allowing other interceptors
* to be given a chance to handle the error.
*/
@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> deferredResult, Throwable t)
throws Exception {
return true;
}
/**
* This implementation is empty.
*/

View File

@ -0,0 +1,37 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.context.request.async;
import java.util.concurrent.Callable;
import org.springframework.web.context.request.NativeWebRequest;
/**
* Registered at the end, after all other interceptors and
* therefore invoked only if no other interceptor handles the error.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class ErrorCallableProcessingInterceptor extends CallableProcessingInterceptorAdapter {
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return t;
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.context.request.async;
import org.springframework.web.context.request.NativeWebRequest;
/**
* Registered at the end, after all other interceptors and
* therefore invoked only if no other interceptor handles the error.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class ErrorDeferredResultProcessingInterceptor extends DeferredResultProcessingInterceptorAdapter {
@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> result, Throwable t)
throws Exception {
result.setErrorResult(t);
return false;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@ -50,6 +52,8 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
private final List<Runnable> timeoutHandlers = new ArrayList<>();
private final List<Consumer<Throwable>> exceptionHandlers = new ArrayList<>();
private final List<Runnable> completionHandlers = new ArrayList<>();
@ -78,6 +82,11 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
this.timeoutHandlers.add(timeoutHandler);
}
@Override
public void addErrorHandler(Consumer<Throwable> exceptionHandler) {
this.exceptionHandlers.add(exceptionHandler);
}
@Override
public void addCompletionHandler(Runnable runnable) {
this.completionHandlers.add(runnable);
@ -134,7 +143,9 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
@Override
public void onError(AsyncEvent event) throws IOException {
onComplete(event);
for (Consumer<Throwable> handler : this.exceptionHandlers) {
handler.accept(event.getThrowable());
}
}
@Override

View File

@ -67,9 +67,15 @@ public final class WebAsyncManager {
private static final CallableProcessingInterceptor timeoutCallableInterceptor =
new TimeoutCallableProcessingInterceptor();
private static final CallableProcessingInterceptor errorCallableInterceptor =
new ErrorCallableProcessingInterceptor();
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
new TimeoutDeferredResultProcessingInterceptor();
private static final DeferredResultProcessingInterceptor errorDeferredResultInterceptor =
new ErrorDeferredResultProcessingInterceptor();
private AsyncWebRequest asyncWebRequest;
@ -281,6 +287,7 @@ public final class WebAsyncManager {
interceptors.add(webAsyncTask.getInterceptor());
interceptors.addAll(this.callableInterceptors.values());
interceptors.add(timeoutCallableInterceptor);
interceptors.add(errorCallableInterceptor);
final Callable<?> callable = webAsyncTask.getCallable();
final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
@ -293,6 +300,14 @@ public final class WebAsyncManager {
}
});
this.asyncWebRequest.addErrorHandler(t -> {
logger.debug("Processing error");
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, t);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
setConcurrentResultAndDispatch(result);
}
});
this.asyncWebRequest.addCompletionHandler(() ->
interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));
@ -371,6 +386,7 @@ public final class WebAsyncManager {
interceptors.add(deferredResult.getInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
interceptors.add(errorDeferredResultInterceptor);
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
@ -383,6 +399,15 @@ public final class WebAsyncManager {
}
});
this.asyncWebRequest.addErrorHandler(t -> {
try {
interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, t);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});
this.asyncWebRequest.addCompletionHandler(()
-> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -46,6 +46,8 @@ public class WebAsyncTask<V> implements BeanFactoryAware {
private Callable<V> timeoutCallback;
private Callable<V> errorCallback;
private Runnable completionCallback;
@ -150,6 +152,18 @@ public class WebAsyncTask<V> implements BeanFactoryAware {
this.timeoutCallback = callback;
}
/**
* Register code to invoke when an error occurred while processing the async request.
* <p>This method is called from a container thread when an error occurred while processing
* an async request before the {@code Callable} has completed. The callback is executed in
* the same thread and therefore should return without blocking. It may return
* an alternative value to use, including an {@link Exception} or return
* {@link CallableProcessingInterceptor#RESULT_NONE RESULT_NONE}.
*/
public void onError(Callable<V> callback) {
this.errorCallback = callback;
}
/**
* Register code to invoke when the async request completes.
* <p>This method is called from a container thread when an async request
@ -166,6 +180,10 @@ public class WebAsyncTask<V> implements BeanFactoryAware {
return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
if (completionCallback != null) {
completionCallback.run();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.web.context.request.async;
import java.util.function.Consumer;
import org.junit.Test;
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
@ -125,4 +127,27 @@ public class DeferredResultTests {
verify(handler).handleResult("timeout result");
}
@Test
public void onError() throws Exception {
final StringBuilder sb = new StringBuilder();
DeferredResultHandler handler = mock(DeferredResultHandler.class);
DeferredResult<String> result = new DeferredResult<>(null, "error result");
result.setResultHandler(handler);
Exception e = new Exception();
result.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
sb.append("error event");
}
});
result.getInterceptor().handleError(null, null, e);
assertEquals("error event", sb.toString());
assertFalse("Should not be able to set result a second time", result.setResult("hello"));
verify(handler).handleResult(e);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,8 @@
package org.springframework.web.context.request.async;
import java.util.function.Consumer;
import javax.servlet.AsyncEvent;
import org.junit.Before;
@ -128,6 +130,16 @@ public class StandardServletAsyncWebRequestTests {
verify(timeoutHandler).run();
}
@SuppressWarnings("unchecked")
@Test
public void onErrorHandler() throws Exception {
Consumer<Throwable> errorHandler = mock(Consumer.class);
this.asyncRequest.addErrorHandler(errorHandler);
Exception e = new Exception();
this.asyncRequest.onError(new AsyncEvent(new MockAsyncContext(this.request, this.response), e));
verify(errorHandler).accept(e);
}
@Test(expected = IllegalStateException.class)
public void setTimeoutDuringConcurrentHandling() {
this.asyncRequest.startAsync();
@ -148,13 +160,26 @@ public class StandardServletAsyncWebRequestTests {
// SPR-13292
@SuppressWarnings("unchecked")
@Test
public void onCompletionHandlerAfterOnErrorEvent() throws Exception {
public void onErrorHandlerAfterOnErrorEvent() throws Exception {
Consumer<Throwable> handler = mock(Consumer.class);
this.asyncRequest.addErrorHandler(handler);
this.asyncRequest.startAsync();
Exception e = new Exception();
this.asyncRequest.onError(new AsyncEvent(this.request.getAsyncContext(), e));
verify(handler).accept(e);
}
@Test
public void onCompletionHandlerAfterOnCompleteEvent() throws Exception {
Runnable handler = mock(Runnable.class);
this.asyncRequest.addCompletionHandler(handler);
this.asyncRequest.startAsync();
this.asyncRequest.onError(new AsyncEvent(this.request.getAsyncContext()));
this.asyncRequest.onComplete(new AsyncEvent(this.request.getAsyncContext()));
verify(handler).run();
assertTrue(this.asyncRequest.isAsyncComplete());

View File

@ -0,0 +1,280 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.context.request.async;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import javax.servlet.AsyncEvent;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.mock.web.test.MockAsyncContext;
import org.springframework.mock.web.test.MockHttpServletRequest;
import org.springframework.mock.web.test.MockHttpServletResponse;
import org.springframework.web.context.request.NativeWebRequest;
import static org.junit.Assert.*;
import static org.mockito.BDDMockito.*;
import static org.springframework.web.context.request.async.CallableProcessingInterceptor.*;
/**
* {@link WebAsyncManager} tests where container-triggered error/completion
* events are simulated.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class WebAsyncManagerErrorTests {
private WebAsyncManager asyncManager;
private StandardServletAsyncWebRequest asyncWebRequest;
private MockHttpServletRequest servletRequest;
private MockHttpServletResponse servletResponse;
@Before
public void setup() {
this.servletRequest = new MockHttpServletRequest("GET", "/test");
this.servletRequest.setAsyncSupported(true);
this.servletResponse = new MockHttpServletResponse();
this.asyncWebRequest = new StandardServletAsyncWebRequest(servletRequest, servletResponse);
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
this.asyncManager = WebAsyncUtils.getAsyncManager(servletRequest);
this.asyncManager.setTaskExecutor(executor);
this.asyncManager.setAsyncWebRequest(this.asyncWebRequest);
}
@Test
public void startCallableProcessingErrorAndComplete() throws Exception {
StubCallable callable = new StubCallable();
CallableProcessingInterceptor interceptor = mock(CallableProcessingInterceptor.class);
Exception e = new Exception();
given(interceptor.handleError(this.asyncWebRequest, callable, e)).willReturn(RESULT_NONE);
this.asyncManager.registerCallableInterceptor("interceptor", interceptor);
this.asyncManager.startCallableProcessing(callable);
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
this.asyncWebRequest.onComplete(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(e, this.asyncManager.getConcurrentResult());
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
verify(interceptor).afterCompletion(this.asyncWebRequest, callable);
}
@Test
public void startCallableProcessingErrorAndResumeThroughCallback() throws Exception {
StubCallable callable = new StubCallable();
WebAsyncTask<Object> webAsyncTask = new WebAsyncTask<>(callable);
webAsyncTask.onError(new Callable<Object>() {
@Override
public Object call() throws Exception {
return 7;
}
});
this.asyncManager.startCallableProcessing(webAsyncTask);
Exception e = new Exception();
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(7, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
}
@Test
public void startCallableProcessingErrorAndResumeThroughInterceptor() throws Exception {
StubCallable callable = new StubCallable();
CallableProcessingInterceptor interceptor = mock(CallableProcessingInterceptor.class);
Exception e = new Exception();
given(interceptor.handleError(this.asyncWebRequest, callable, e)).willReturn(22);
this.asyncManager.registerCallableInterceptor("errorInterceptor", interceptor);
this.asyncManager.startCallableProcessing(callable);
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(22, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
}
@Test
public void startCallableProcessingAfterException() throws Exception {
StubCallable callable = new StubCallable();
Exception exception = new Exception();
CallableProcessingInterceptor interceptor = mock(CallableProcessingInterceptor.class);
Exception e = new Exception();
given(interceptor.handleError(this.asyncWebRequest, callable, e)).willThrow(exception);
this.asyncManager.registerCallableInterceptor("errorInterceptor", interceptor);
this.asyncManager.startCallableProcessing(callable);
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(exception, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
}
@Test
public void startDeferredResultProcessingErrorAndComplete() throws Exception {
DeferredResult<Integer> deferredResult = new DeferredResult<>();
DeferredResultProcessingInterceptor interceptor = mock(DeferredResultProcessingInterceptor.class);
Exception e = new Exception();
given(interceptor.handleError(this.asyncWebRequest, deferredResult, e)).willReturn(true);
this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor);
this.asyncManager.startDeferredResultProcessing(deferredResult);
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
this.asyncWebRequest.onComplete(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(e, this.asyncManager.getConcurrentResult());
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, deferredResult);
verify(interceptor).preProcess(this.asyncWebRequest, deferredResult);
verify(interceptor).afterCompletion(this.asyncWebRequest, deferredResult);
}
@Test
public void startDeferredResultProcessingErrorAndResumeWithDefaultResult() throws Exception {
Exception e = new Exception();
DeferredResult<Throwable> deferredResult = new DeferredResult<>(null, e);
this.asyncManager.startDeferredResultProcessing(deferredResult);
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(e, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
}
@Test
public void startDeferredResultProcessingErrorAndResumeThroughCallback() throws Exception {
final DeferredResult<Throwable> deferredResult = new DeferredResult<>();
deferredResult.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
deferredResult.setResult(t);
}
});
this.asyncManager.startDeferredResultProcessing(deferredResult);
Exception e = new Exception();
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(e, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
}
@Test
public void startDeferredResultProcessingErrorAndResumeThroughInterceptor() throws Exception {
DeferredResult<Integer> deferredResult = new DeferredResult<>();
DeferredResultProcessingInterceptor interceptor = new DeferredResultProcessingInterceptorAdapter() {
@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> result, Throwable t)
throws Exception {
result.setErrorResult(t);
return true;
}
};
this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor);
this.asyncManager.startDeferredResultProcessing(deferredResult);
Exception e = new Exception();
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(e, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
}
@Test
public void startDeferredResultProcessingAfterException() throws Exception {
DeferredResult<Integer> deferredResult = new DeferredResult<>();
final Exception exception = new Exception();
DeferredResultProcessingInterceptor interceptor = new DeferredResultProcessingInterceptorAdapter() {
@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> result, Throwable t)
throws Exception {
throw exception;
}
};
this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor);
this.asyncManager.startDeferredResultProcessing(deferredResult);
Exception e = new Exception();
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), e);
this.asyncWebRequest.onError(event);
assertTrue(this.asyncManager.hasConcurrentResult());
assertEquals(e, this.asyncManager.getConcurrentResult());
assertEquals("/test", ((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath());
}
private final class StubCallable implements Callable<Object> {
@Override
public Object call() throws Exception {
return 21;
}
}
}

View File

@ -17,6 +17,8 @@
package org.springframework.web.context.request.async;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import javax.servlet.http.HttpServletRequest;
import org.junit.Before;
@ -138,6 +140,7 @@ public class WebAsyncManagerTests {
verify(interceptor).postProcess(this.asyncWebRequest, task, concurrentResult);
}
@SuppressWarnings("unchecked")
@Test
public void startCallableProcessingBeforeConcurrentHandlingException() throws Exception {
Callable<Object> task = new StubCallable(21);
@ -159,6 +162,7 @@ public class WebAsyncManagerTests {
assertFalse(this.asyncManager.hasConcurrentResult());
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
}
@ -228,18 +232,19 @@ public class WebAsyncManagerTests {
verify(interceptor2).preProcess(this.asyncWebRequest, task);
}
@SuppressWarnings("unchecked")
@Test
public void startCallableProcessingWithAsyncTask() throws Exception {
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
given(this.asyncWebRequest.getNativeRequest(HttpServletRequest.class)).willReturn(this.servletRequest);
@SuppressWarnings("unchecked")
WebAsyncTask<Object> asyncTask = new WebAsyncTask<>(1000L, executor, mock(Callable.class));
this.asyncManager.startCallableProcessing(asyncTask);
verify(executor).submit((Runnable) notNull());
verify(this.asyncWebRequest).setTimeout(1000L);
verify(this.asyncWebRequest).addTimeoutHandler(any(Runnable.class));
verify(this.asyncWebRequest).addErrorHandler(any(Consumer.class));
verify(this.asyncWebRequest).addCompletionHandler(any(Runnable.class));
verify(this.asyncWebRequest).startAsync();
}
@ -277,6 +282,7 @@ public class WebAsyncManagerTests {
verify(this.asyncWebRequest).setTimeout(1000L);
}
@SuppressWarnings("unchecked")
@Test
public void startDeferredResultProcessingBeforeConcurrentHandlingException() throws Exception {
DeferredResult<Integer> deferredResult = new DeferredResult<>();
@ -298,6 +304,7 @@ public class WebAsyncManagerTests {
assertFalse(this.asyncManager.hasConcurrentResult());
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
}
@ -359,8 +366,10 @@ public class WebAsyncManagerTests {
given(this.asyncWebRequest.isAsyncComplete()).willReturn(false);
}
@SuppressWarnings("unchecked")
private void verifyDefaultAsyncScenario() {
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
verify(this.asyncWebRequest).startAsync();
verify(this.asyncWebRequest).dispatch();

View File

@ -219,6 +219,7 @@ class ReactiveTypeHandler {
terminate();
this.emitter.complete();
});
this.emitter.onError(t -> this.emitter.completeWithError(t));
subscription.request(1);
}

View File

@ -19,6 +19,7 @@ package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
@ -74,6 +75,8 @@ public class ResponseBodyEmitter {
private final DefaultCallback timeoutCallback = new DefaultCallback();
private final ErrorCallback errorCallback = new ErrorCallback();
private final DefaultCallback completionCallback = new DefaultCallback();
@ -123,6 +126,7 @@ public class ResponseBodyEmitter {
}
else {
this.handler.onTimeout(this.timeoutCallback);
this.handler.onError(this.errorCallback);
this.handler.onCompletion(this.completionCallback);
}
}
@ -168,11 +172,9 @@ public class ResponseBodyEmitter {
this.handler.send(object, mediaType);
}
catch (IOException ex) {
completeWithError(ex);
throw ex;
}
catch (Throwable ex) {
completeWithError(ex);
throw new IllegalStateException("Failed to send " + object, ex);
}
}
@ -214,6 +216,15 @@ public class ResponseBodyEmitter {
this.timeoutCallback.setDelegate(callback);
}
/**
* Register code to invoke when an error occurred while processing the async request.
* This method is called from a container thread when an error occurred while processing
* an async request.
*/
public synchronized void onError(Consumer<Throwable> callback) {
this.errorCallback.setDelegate(callback);
}
/**
* Register code to invoke when the async request completes. This method is
* called from a container thread when an async request completed for any
@ -244,6 +255,8 @@ public class ResponseBodyEmitter {
void onTimeout(Runnable callback);
void onError(Consumer<Throwable> callback);
void onCompletion(Runnable callback);
}
@ -291,4 +304,22 @@ public class ResponseBodyEmitter {
}
}
private class ErrorCallback implements Consumer<Throwable> {
private Consumer<Throwable> delegate;
public void setDelegate(Consumer<Throwable> callback) {
this.delegate = callback;
}
@Override
public void accept(Throwable t) {
ResponseBodyEmitter.this.complete = true;
if (this.delegate != null) {
this.delegate.accept(t);
}
}
}
}

View File

@ -19,6 +19,8 @@ package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.function.Consumer;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -217,6 +219,11 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
this.deferredResult.onTimeout(callback);
}
@Override
public void onError(Consumer<Throwable> callback) {
this.deferredResult.onError(callback);
}
@Override
public void onCompletion(Runnable callback) {
this.deferredResult.onCompletion(callback);

View File

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.junit.Before;
@ -380,6 +381,10 @@ public class ReactiveTypeHandlerTests {
public void onTimeout(Runnable callback) {
}
@Override
public void onError(Consumer<Throwable> callback) {
}
@Override
public void onCompletion(Runnable callback) {
}

View File

@ -19,6 +19,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
@ -175,6 +176,25 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
verify(asyncWebRequest).startAsync();
}
@SuppressWarnings("unchecked")
@Test
public void responseBodyEmitterWithErrorValue() throws Exception {
AsyncWebRequest asyncWebRequest = mock(AsyncWebRequest.class);
WebAsyncUtils.getAsyncManager(this.request).setAsyncWebRequest(asyncWebRequest);
ResponseBodyEmitter emitter = new ResponseBodyEmitter(19000L);
emitter.onError(mock(Consumer.class));
emitter.onCompletion(mock(Runnable.class));
MethodParameter type = on(TestController.class).resolveReturnType(ResponseBodyEmitter.class);
this.handler.handleReturnValue(emitter, type, this.mavContainer, this.webRequest);
verify(asyncWebRequest).addErrorHandler(any(Consumer.class));
verify(asyncWebRequest, times(2)).addCompletionHandler(any(Runnable.class));
verify(asyncWebRequest).startAsync();
}
@Test
public void sseEmitter() throws Exception {
MethodParameter type = on(TestController.class).resolveReturnType(SseEmitter.class);

View File

@ -102,6 +102,7 @@ public class ResponseBodyEmitterTests {
public void sendAfterHandlerInitialized() throws Exception {
this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onError(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler);
@ -119,6 +120,7 @@ public class ResponseBodyEmitterTests {
public void sendAfterHandlerInitializedWithError() throws Exception {
this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onError(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler);
@ -137,6 +139,7 @@ public class ResponseBodyEmitterTests {
public void sendWithError() throws Exception {
this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onError(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler);
@ -150,7 +153,6 @@ public class ResponseBodyEmitterTests {
// expected
}
verify(this.handler).send("foo", MediaType.TEXT_PLAIN);
verify(this.handler).completeWithError(failure);
verifyNoMoreInteractions(this.handler);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
@ -152,6 +153,10 @@ public class SseEmitterTests {
public void onTimeout(Runnable callback) {
}
@Override
public void onError(Consumer<Throwable> callback) {
}
@Override
public void onCompletion(Runnable callback) {
}